From e10dc746de9ee507777105ce298c0c36d7a598c2 Mon Sep 17 00:00:00 2001 From: Manoj Babu Katragadda Date: Tue, 24 Mar 2026 16:28:29 +0800 Subject: [PATCH] feat: Add Lance file format support via File Format API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements Lance as a new Iceberg file format using the FormatModel API (introduced in PR #12774). This is the first new format added through the pluggable File Format API, validating its design for external formats. Changes: - Add LANCE to FileFormat enum - Add iceberg-lance module with LanceFormatModel, readers, writers, schema conversion, and Arrow type mapping - Add Spark 4.1 integration (ColumnarBatch + InternalRow) in the lance module to avoid Arrow relocation conflicts with the runtime shadow jar - Register Lance format models in FormatModelRegistry with auto-discovery - Exclude Lance from the Spark runtime shadow jar (JNI native code requires unrelocated Arrow classes) - 19 unit tests covering round-trip, schema preservation, projection, null handling, batch sizing, and file length Runtime classpath for Spark: --jars iceberg-spark-runtime.jar,iceberg-lance.jar,lance-core.jar, jar-jni.jar,arrow-c-data.jar Known gaps documented in lance/LANCE_SDK_GAPS.md: 1. getBytesWritten — Lance JNI discards Rust finish() return value 2. Column statistics — awaiting Lance PR #5639 Java SDK exposure 3. Split planning — needs byte-offset-to-row mapping 4. Predicate pushdown — no-op currently, correctness preserved 5. Name mapping — schema evolution support Co-Authored-By: Claude Opus 4.6 (1M context) --- .../java/org/apache/iceberg/FileFormat.java | 1 + build.gradle | 40 ++ .../iceberg/formats/FormatModelRegistry.java | 3 +- lance/LANCE_SDK_GAPS.md | 150 ++++++ .../iceberg/lance/GenericLanceReader.java | 76 +++ .../iceberg/lance/GenericLanceWriter.java | 47 ++ .../iceberg/lance/LanceArrowConverter.java | 309 +++++++++++ .../iceberg/lance/LanceFileAppender.java | 168 ++++++ .../iceberg/lance/LanceFormatModel.java | 489 ++++++++++++++++++ .../iceberg/lance/LanceFormatModels.java | 65 +++ .../apache/iceberg/lance/LanceSchemaUtil.java | 269 ++++++++++ .../lance/spark/ConstantColumnVector.java | 129 +++++ .../lance/spark/IcebergTypeToSparkType.java | 64 +++ .../lance/spark/SparkLanceColumnarReader.java | 94 ++++ .../lance/spark/SparkLanceFormatModels.java | 56 ++ .../lance/spark/SparkLanceRowReader.java | 169 ++++++ .../iceberg/lance/spark/SparkLanceWriter.java | 99 ++++ .../iceberg/lance/TestLanceFormatModel.java | 106 ++++ .../iceberg/lance/TestLanceRoundTrip.java | 336 ++++++++++++ .../lance/TestLanceSchemaConversion.java | 171 ++++++ settings.gradle | 2 + spark/v4.1/build.gradle | 8 + 22 files changed, 2850 insertions(+), 1 deletion(-) create mode 100644 lance/LANCE_SDK_GAPS.md create mode 100644 lance/src/main/java/org/apache/iceberg/lance/GenericLanceReader.java create mode 100644 lance/src/main/java/org/apache/iceberg/lance/GenericLanceWriter.java create mode 100644 lance/src/main/java/org/apache/iceberg/lance/LanceArrowConverter.java create mode 100644 lance/src/main/java/org/apache/iceberg/lance/LanceFileAppender.java create mode 100644 lance/src/main/java/org/apache/iceberg/lance/LanceFormatModel.java create mode 100644 lance/src/main/java/org/apache/iceberg/lance/LanceFormatModels.java create mode 100644 lance/src/main/java/org/apache/iceberg/lance/LanceSchemaUtil.java create mode 100644 lance/src/main/java/org/apache/iceberg/lance/spark/ConstantColumnVector.java create mode 100644 lance/src/main/java/org/apache/iceberg/lance/spark/IcebergTypeToSparkType.java create mode 100644 lance/src/main/java/org/apache/iceberg/lance/spark/SparkLanceColumnarReader.java create mode 100644 lance/src/main/java/org/apache/iceberg/lance/spark/SparkLanceFormatModels.java create mode 100644 lance/src/main/java/org/apache/iceberg/lance/spark/SparkLanceRowReader.java create mode 100644 lance/src/main/java/org/apache/iceberg/lance/spark/SparkLanceWriter.java create mode 100644 lance/src/test/java/org/apache/iceberg/lance/TestLanceFormatModel.java create mode 100644 lance/src/test/java/org/apache/iceberg/lance/TestLanceRoundTrip.java create mode 100644 lance/src/test/java/org/apache/iceberg/lance/TestLanceSchemaConversion.java diff --git a/api/src/main/java/org/apache/iceberg/FileFormat.java b/api/src/main/java/org/apache/iceberg/FileFormat.java index 064fc1465fa8..4f7372525eee 100644 --- a/api/src/main/java/org/apache/iceberg/FileFormat.java +++ b/api/src/main/java/org/apache/iceberg/FileFormat.java @@ -28,6 +28,7 @@ public enum FileFormat { ORC("orc", true), PARQUET("parquet", true), AVRO("avro", true), + LANCE("lance", false), METADATA("metadata.json", false); private final String ext; diff --git a/build.gradle b/build.gradle index 4cf6cf2ad9d7..90011616ebd4 100644 --- a/build.gradle +++ b/build.gradle @@ -961,6 +961,46 @@ project(':iceberg-arrow') { } } +project(':iceberg-lance') { + test { + useJUnitPlatform() + jvmArgs '--add-opens=java.base/java.nio=ALL-UNNAMED' + } + dependencies { + implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') + api project(':iceberg-api') + implementation project(':iceberg-core') + implementation project(':iceberg-arrow') + + implementation(libs.arrow.vector) { + exclude group: 'io.netty', module: 'netty-buffer' + exclude group: 'io.netty', module: 'netty-common' + exclude group: 'com.google.code.findbugs', module: 'jsr305' + } + implementation(libs.arrow.memory.netty) { + exclude group: 'com.google.code.findbugs', module: 'jsr305' + exclude group: 'io.netty', module: 'netty-common' + exclude group: 'io.netty', module: 'netty-buffer' + } + + // Lance Java SDK (external dependency) + implementation 'org.lance:lance-core:4.1.0-beta.0' + + // Spark (compileOnly — provided at runtime by Spark environment) + compileOnly("org.apache.spark:spark-hive_2.13:${libs.versions.spark41.get()}") { + exclude group: 'org.apache.arrow' + exclude group: 'org.apache.parquet' + exclude group: 'org.apache.avro', module: 'avro' + exclude group: 'io.netty', module: 'netty-buffer' + exclude group: 'io.netty', module: 'netty-common' + exclude group: 'org.roaringbitmap' + } + + testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts') + testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts') + } +} + project(':iceberg-nessie') { test { useJUnitPlatform() diff --git a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java index 4a6b5a6cf40f..435ff7f9aa52 100644 --- a/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java +++ b/core/src/main/java/org/apache/iceberg/formats/FormatModelRegistry.java @@ -59,7 +59,8 @@ private FormatModelRegistry() {} "org.apache.iceberg.data.GenericFormatModels", "org.apache.iceberg.arrow.vectorized.ArrowFormatModels", "org.apache.iceberg.flink.data.FlinkFormatModels", - "org.apache.iceberg.spark.source.SparkFormatModels"); + "org.apache.iceberg.spark.source.SparkFormatModels", + "org.apache.iceberg.lance.LanceFormatModels"); // Format models indexed by file format and object model class private static final Map>, FormatModel> MODELS = diff --git a/lance/LANCE_SDK_GAPS.md b/lance/LANCE_SDK_GAPS.md new file mode 100644 index 000000000000..e273d39a517f --- /dev/null +++ b/lance/LANCE_SDK_GAPS.md @@ -0,0 +1,150 @@ +# Lance Java SDK Gaps + +Gaps in the Lance Java SDK that affect the iceberg-lance integration. +These should be addressed in the Lance Java SDK, not worked around in iceberg-lance. + +## 1. File Length (getBytesWritten) + +**Current workaround:** After closing the writer, we use `OutputFile.toInputFile().getLength()` +to get the file size (same pattern as ORC's FileAppender). + +**What Lance already has:** The Rust `FileWriter::finish()` returns `Result` — the total +bytes written. But the JNI layer (`file_writer.rs`) discards this value in `closeNative`: + + Ok(_) => {} // discards the u64 + +And the Java `close()` returns void. + +**Proposed fix in Lance Java SDK:** +- JNI: Change `closeNative` to return `jlong` with the bytes written +- Java: Store the value in a field, add `public long getBytesWritten()` getter +- Non-breaking: `close()` stays void, `AutoCloseable` contract preserved + +## 2. Column Statistics (min/max/null_count per column) + +**Current state:** `LanceFileAppender.metrics()` only returns `recordCount`. All other +fields (columnSizes, valueCounts, nullValueCounts, lowerBounds, upperBounds) are null. +This means Iceberg cannot do file-level pruning on Lance data files. + +**What Lance is building:** PR #5639 (Column Statistics MVP) adds per-fragment min/max, +null count, and NaN count to the Rust core. Stored as Arrow IPC in the file's global +buffer. Design discussion: https://github.com/lancedb/lance/discussions/4540 + +**What's missing:** PR #5639 does not touch the Java SDK. Once merged, the JNI layer +and Java SDK need new APIs to read column statistics back — something like +`LanceFileReader.getColumnStatistics()`. + +**Proposed fix in Lance Java SDK (after Rust PR #5639 merges):** +- JNI: Expose column statistics from the global buffer via a new native method +- Java: Add `getColumnStatistics()` to LanceFileReader returning per-column min/max/null_count +- iceberg-lance: Map those stats to Iceberg's Metrics object using Conversions.toByteBuffer() + +## 3. Split Planning (Parallel Reads) + +**Current state:** `FileFormat.LANCE("lance", false)` — splittable is set to false. +`LanceFileAppender.splitOffsets()` returns null. This means each Lance file is read by +a single task. No within-file parallelism. + +**Why this matters:** For MPP engines like Spark, parallelism comes from task count. +Without splits, one 512MB Lance file = one task = one core. With splits, that same file +could be 10 tasks across 10 cores. + +**Approach (depends on Gap #1 — getBytesWritten):** + +During writing, after each batch flush in LanceFileAppender, record the cumulative bytes +written so far (from `getBytesWritten()`) and the corresponding row number. Store this +mapping in the Lance file's schema metadata via `addSchemaMetadata()`: + + "lance:split:0" → "0" // byte offset 0 → row 0 + "lance:split:34500000" → "50000" // byte offset 34.5MB → row 50000 + "lance:split:69200000" → "100000" // byte offset 69.2MB → row 100000 + +Write side: +- `splitOffsets()` returns the byte offsets: [0, 34500000, 69200000] +- Iceberg stores these in the DataFile manifest entry +- All Iceberg math (estimatedRowsCount, split validation) works correctly + because these are real byte positions + +Read side: +- Iceberg calls `ReadBuilder.split(start=34500000, length=34700000)` +- Our reader opens the Lance file, reads the metadata map +- Looks up byte offset 34500000 → row 50000 +- Looks up next split offset 69200000 → row 100000 +- Calls `readAll(projectedNames, List.of(new Range(50000, 100000)), batchSize)` +- Lance natively supports row-range reads via its page table + +Changes needed: +- Lance Java SDK: expose `getBytesWritten()` (same as Gap #1) +- iceberg-lance: Change `FileFormat.LANCE("lance", true)` — set splittable to true +- iceberg-lance: LanceFileAppender — record byte-offset-to-row mapping after each flush +- iceberg-lance: LanceFileAppender.splitOffsets() — return byte offset list +- iceberg-lance: ReadBuilderWrapper.split() — store start/length, translate to row range on build() + +**Why row numbers as offsets won't work:** +Iceberg's `estimatedRowsCount()` in BaseContentScanTask computes: + length / (fileSizeInBytes - firstOffset) * recordCount +This assumes offsets and length are byte positions. Using row numbers (e.g., 50000) +with fileSizeInBytes (e.g., 50000000) gives a fraction of 0.001 instead of ~0.33, +producing wildly wrong estimates that break Spark's task scheduling. + +**Why proportional byte-to-row translation won't work:** +Lance is columnar — bytes are organized by columns, not rows. The first 30% of a file's +bytes does not correspond to the first 30% of rows. Proportional math +(offset / fileSizeInBytes * totalRows) is fundamentally inaccurate for columnar formats. + +## 4. Predicate Pushdown (filter) + +**Current state:** `ReadBuilderWrapper.filter()` is a no-op. The filter expression is +accepted but never applied. All rows are read and filtering happens in Iceberg's residual +evaluator after the fact. + +**Why this matters:** Without pushdown, Lance reads all rows from disk even when a query +has a WHERE clause that could eliminate most of them. For a query like +`SELECT * FROM t WHERE id = 42`, Lance would read all rows instead of using its scalar +index or page-level filtering. + +**What Lance supports:** Lance has native predicate pushdown through its scan API: +- Scalar indexes (BTREE, BITMAP) for point lookups and range queries +- Zone map filtering at the page level +- Filter expressions passed to the reader + +**What's needed in iceberg-lance:** +- Translate Iceberg's `Expression` (from `ReadBuilder.filter()`) to Lance's filter format +- Pass the translated filter to `readAll()` or use Lance's scan API with filter support +- This is purely iceberg-lance work — no Lance SDK changes needed + +**Priority:** Medium. Not blocking for initial contribution since Iceberg applies residual +filters anyway (correctness is preserved). But important for production performance. + +## 5. Name Mapping (Schema Evolution) + +**Current state:** `ReadBuilderWrapper.withNameMapping()` is a no-op. The NameMapping +is accepted but never used. + +**Why this matters:** Iceberg supports schema evolution — columns can be renamed, added, +or reordered. When reading files written with an older schema, Iceberg uses NameMapping +to resolve column identity by field ID rather than column name. Without this, reading +files written before a column rename would fail or return wrong data. + +**What's needed in iceberg-lance:** +- When a NameMapping is provided, use it to resolve column names during schema conversion + in LanceSchemaUtil +- Map Iceberg field IDs to the column names used in the Lance file, regardless of what + the current schema calls them +- This relies on the `PARQUET:field_id` metadata that LanceSchemaUtil already writes + into Arrow schema fields + +**Priority:** Low for initial contribution. Becomes important when users start evolving +schemas on existing Lance-backed Iceberg tables. + +## References + +- Lance PR #5639: Column Statistics MVP — https://github.com/lancedb/lance/pull/5639 +- Lance Discussion #4540: Column Statistics Design — https://github.com/lancedb/lance/discussions/4540 +- Lance Issue #5857: Post-MVP Improvements — https://github.com/lancedb/lance/issues/5857 +- Lance JNI file_writer.rs: closeNative discards finish() return value +- Iceberg BaseContentScanTask.java — split planning logic, estimatedRowsCount() +- Iceberg OffsetsAwareSplitScanTaskIterator.java — splits tasks by offset boundaries +- Iceberg BaseFile.java lines 551-553 — split offset validation +- Iceberg GenericReader.java — calls ReadBuilder.split(task.start(), task.length()) +- Lance LanceFileReader.readAll() — supports row-range reads via Range parameter diff --git a/lance/src/main/java/org/apache/iceberg/lance/GenericLanceReader.java b/lance/src/main/java/org/apache/iceberg/lance/GenericLanceReader.java new file mode 100644 index 000000000000..8c5f387e8888 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/GenericLanceReader.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +/** + * Reader function for the generic Iceberg {@link Record} data model. + * + *

Converts Arrow VectorSchemaRoot batches from Lance files into Iceberg Records. + */ +public class GenericLanceReader { + + private GenericLanceReader() {} + + /** + * Creates a function that converts Arrow batches into CloseableIterables of Iceberg Records. + * + * @param icebergSchema the Iceberg schema to construct Records with + * @param arrowSchema the Arrow schema of the Lance file (unused, schema comes from batch) + * @param idToConstant constant values to inject (e.g., partition values) + * @return a function that takes a (batch, idToConstant) Entry and produces Records + */ + public static Function>, CloseableIterable> + buildReader( + Schema icebergSchema, + org.apache.arrow.vector.types.pojo.Schema arrowSchema, + Map idToConstant) { + return entry -> { + VectorSchemaRoot batch = entry.getKey(); + Map constants = entry.getValue(); + + List records = Lists.newArrayListWithCapacity(batch.getRowCount()); + for (int i = 0; i < batch.getRowCount(); i++) { + records.add(LanceArrowConverter.readRow(batch, i, icebergSchema, constants)); + } + + return new CloseableIterable() { + @Override + public CloseableIterator iterator() { + return CloseableIterator.withClose(records.iterator()); + } + + @Override + public void close() throws IOException { + // batch lifecycle managed by ArrowReader + } + }; + }; + } +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/GenericLanceWriter.java b/lance/src/main/java/org/apache/iceberg/lance/GenericLanceWriter.java new file mode 100644 index 000000000000..14640e399603 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/GenericLanceWriter.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +import java.util.Map; +import java.util.function.Function; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; + +/** + * Writer function for the generic Iceberg {@link Record} data model. + * + *

Converts Iceberg Records to Maps of column name to value, suitable for the Lance Arrow + * conversion pipeline. + */ +public class GenericLanceWriter { + + private GenericLanceWriter() {} + + /** + * Creates a function that converts Iceberg Records to a Map representation. + * + * @param icebergSchema the Iceberg schema defining the columns + * @param arrowSchema the corresponding Arrow schema (unused for generic records) + * @return a function mapping Records to column name/value Maps + */ + public static Function> create( + Schema icebergSchema, org.apache.arrow.vector.types.pojo.Schema arrowSchema) { + return record -> LanceArrowConverter.recordToMap(record, icebergSchema); + } +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/LanceArrowConverter.java b/lance/src/main/java/org/apache/iceberg/lance/LanceArrowConverter.java new file mode 100644 index 000000000000..cd108cdd6fd3 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/LanceArrowConverter.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.time.temporal.ChronoUnit; +import java.util.Map; +import java.util.UUID; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FixedSizeBinaryVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; + +/** + * Converts between Iceberg Records and Arrow VectorSchemaRoot batches. + * + *

Handles the type-level conversion for writing Iceberg records into Arrow vectors and reading + * Arrow vectors back into Iceberg records. + */ +class LanceArrowConverter { + + private LanceArrowConverter() {} + + /** + * Write a single row of values into the Arrow VectorSchemaRoot at the given row index. + * + * @param batch the target Arrow batch + * @param rowIndex the row index to write at + * @param values map of column name to value + * @param schema the Iceberg schema defining the columns + */ + static void writeRow( + VectorSchemaRoot batch, int rowIndex, Map values, Schema schema) { + for (Types.NestedField field : schema.columns()) { + Object value = values.get(field.name()); + if (value == null) { + // Arrow vectors default to null, no action needed + continue; + } + writeValue(batch, field, rowIndex, value); + } + } + + /** + * Read a single row from the Arrow VectorSchemaRoot and return it as an Iceberg Record. + * + * @param batch the source Arrow batch + * @param rowIndex the row index to read from + * @param schema the Iceberg schema to construct the Record with + * @param idToConstant constant values to inject for non-data columns (e.g., partition values) + * @return an Iceberg Record containing the row's data + */ + static Record readRow( + VectorSchemaRoot batch, int rowIndex, Schema schema, Map idToConstant) { + GenericRecord record = GenericRecord.create(schema); + for (Types.NestedField field : schema.columns()) { + if (idToConstant != null && idToConstant.containsKey(field.fieldId())) { + record.setField(field.name(), idToConstant.get(field.fieldId())); + continue; + } + + if (batch.getVector(field.name()) == null) { + record.setField(field.name(), null); + continue; + } + + Object value = readValue(batch, field.name(), rowIndex, field); + record.setField(field.name(), value); + } + return record; + } + + /** Convert an Iceberg Record to a Map of column name to value, suitable for writeRow(). */ + static Map recordToMap(Record record, Schema schema) { + Map map = Maps.newHashMap(); + for (Types.NestedField field : schema.columns()) { + map.put(field.name(), record.getField(field.name())); + } + return map; + } + + private static void writeValue( + VectorSchemaRoot batch, Types.NestedField field, int rowIndex, Object value) { + if (batch.getVector(field.name()) == null) { + return; + } + + String vectorName = field.name(); + switch (field.type().typeId()) { + case BOOLEAN: + ((BitVector) batch.getVector(vectorName)).setSafe(rowIndex, (Boolean) value ? 1 : 0); + break; + case INTEGER: + ((IntVector) batch.getVector(vectorName)).setSafe(rowIndex, (Integer) value); + break; + case LONG: + ((BigIntVector) batch.getVector(vectorName)).setSafe(rowIndex, (Long) value); + break; + case FLOAT: + ((Float4Vector) batch.getVector(vectorName)).setSafe(rowIndex, (Float) value); + break; + case DOUBLE: + ((Float8Vector) batch.getVector(vectorName)).setSafe(rowIndex, (Double) value); + break; + case DECIMAL: + BigDecimal decimal = (BigDecimal) value; + ((DecimalVector) batch.getVector(vectorName)).setSafe(rowIndex, decimal); + break; + default: + writeTemporalOrBinaryValue(batch, vectorName, field, rowIndex, value); + break; + } + } + + private static void writeTemporalOrBinaryValue( + VectorSchemaRoot batch, + String vectorName, + Types.NestedField field, + int rowIndex, + Object value) { + switch (field.type().typeId()) { + case DATE: + int daysSinceEpoch; + if (value instanceof LocalDate) { + daysSinceEpoch = (int) ((LocalDate) value).toEpochDay(); + } else { + daysSinceEpoch = (Integer) value; + } + ((DateDayVector) batch.getVector(vectorName)).setSafe(rowIndex, daysSinceEpoch); + break; + case TIME: + long timeMicros; + if (value instanceof LocalTime) { + timeMicros = ((LocalTime) value).getLong(java.time.temporal.ChronoField.MICRO_OF_DAY); + } else { + timeMicros = (Long) value; + } + ((TimeMicroVector) batch.getVector(vectorName)).setSafe(rowIndex, timeMicros); + break; + case TIMESTAMP: + long tsMicros = toTimestampMicros(value); + if (((Types.TimestampType) field.type()).shouldAdjustToUTC()) { + ((TimeStampMicroTZVector) batch.getVector(vectorName)).setSafe(rowIndex, tsMicros); + } else { + ((TimeStampMicroVector) batch.getVector(vectorName)).setSafe(rowIndex, tsMicros); + } + break; + default: + writeStringOrBinaryValue(batch, vectorName, field, rowIndex, value); + break; + } + } + + private static void writeStringOrBinaryValue( + VectorSchemaRoot batch, + String vectorName, + Types.NestedField field, + int rowIndex, + Object value) { + switch (field.type().typeId()) { + case STRING: + byte[] strBytes = value.toString().getBytes(StandardCharsets.UTF_8); + ((VarCharVector) batch.getVector(vectorName)) + .setSafe(rowIndex, strBytes, 0, strBytes.length); + break; + case UUID: + UUID uuid = (UUID) value; + ByteBuffer uuidBuf = ByteBuffer.allocate(16); + uuidBuf.putLong(uuid.getMostSignificantBits()); + uuidBuf.putLong(uuid.getLeastSignificantBits()); + ((FixedSizeBinaryVector) batch.getVector(vectorName)).setSafe(rowIndex, uuidBuf.array()); + break; + case FIXED: + byte[] fixedBytes = toByteArray(value); + ((FixedSizeBinaryVector) batch.getVector(vectorName)).setSafe(rowIndex, fixedBytes); + break; + case BINARY: + byte[] binBytes = toByteArray(value); + ((VarBinaryVector) batch.getVector(vectorName)) + .setSafe(rowIndex, binBytes, 0, binBytes.length); + break; + default: + throw new UnsupportedOperationException("Unsupported write type: " + field.type().typeId()); + } + } + + private static byte[] toByteArray(Object value) { + if (value instanceof ByteBuffer) { + ByteBuffer buf = (ByteBuffer) value; + byte[] bytes = new byte[buf.remaining()]; + buf.duplicate().get(bytes); + return bytes; + } + return (byte[]) value; + } + + private static Object readValue( + VectorSchemaRoot batch, String vectorName, int rowIndex, Types.NestedField field) { + if (batch.getVector(vectorName).isNull(rowIndex)) { + return null; + } + + switch (field.type().typeId()) { + case BOOLEAN: + return ((BitVector) batch.getVector(vectorName)).get(rowIndex) == 1; + case INTEGER: + return ((IntVector) batch.getVector(vectorName)).get(rowIndex); + case LONG: + return ((BigIntVector) batch.getVector(vectorName)).get(rowIndex); + case FLOAT: + return ((Float4Vector) batch.getVector(vectorName)).get(rowIndex); + case DOUBLE: + return ((Float8Vector) batch.getVector(vectorName)).get(rowIndex); + case DECIMAL: + return ((DecimalVector) batch.getVector(vectorName)).getObject(rowIndex); + default: + return readTemporalOrBinaryValue(batch, vectorName, rowIndex, field); + } + } + + private static Object readTemporalOrBinaryValue( + VectorSchemaRoot batch, String vectorName, int rowIndex, Types.NestedField field) { + switch (field.type().typeId()) { + case DATE: + int days = ((DateDayVector) batch.getVector(vectorName)).get(rowIndex); + return LocalDate.ofEpochDay(days); + case TIME: + long micros = ((TimeMicroVector) batch.getVector(vectorName)).get(rowIndex); + return LocalTime.ofNanoOfDay(micros * 1000); + case TIMESTAMP: + long tsMicros; + if (((Types.TimestampType) field.type()).shouldAdjustToUTC()) { + tsMicros = ((TimeStampMicroTZVector) batch.getVector(vectorName)).get(rowIndex); + } else { + tsMicros = ((TimeStampMicroVector) batch.getVector(vectorName)).get(rowIndex); + } + return fromTimestampMicros(tsMicros); + case STRING: + return new String( + ((VarCharVector) batch.getVector(vectorName)).get(rowIndex), StandardCharsets.UTF_8); + case UUID: + byte[] uuidBytes = ((FixedSizeBinaryVector) batch.getVector(vectorName)).get(rowIndex); + ByteBuffer uuidBuf = ByteBuffer.wrap(uuidBytes); + return new UUID(uuidBuf.getLong(), uuidBuf.getLong()); + case FIXED: + return ByteBuffer.wrap(((FixedSizeBinaryVector) batch.getVector(vectorName)).get(rowIndex)); + case BINARY: + return ByteBuffer.wrap(((VarBinaryVector) batch.getVector(vectorName)).get(rowIndex)); + default: + throw new UnsupportedOperationException("Unsupported read type: " + field.type().typeId()); + } + } + + private static long toTimestampMicros(Object value) { + if (value instanceof OffsetDateTime) { + OffsetDateTime odt = (OffsetDateTime) value; + Instant instant = odt.toInstant(); + return ChronoUnit.MICROS.between(Instant.EPOCH, instant); + } else if (value instanceof Long) { + return (Long) value; + } else { + throw new UnsupportedOperationException("Cannot convert to timestamp micros: " + value); + } + } + + private static OffsetDateTime fromTimestampMicros(long micros) { + long seconds = Math.floorDiv(micros, 1_000_000); + long nanoAdjustment = Math.floorMod(micros, 1_000_000) * 1000L; + return OffsetDateTime.ofInstant(Instant.ofEpochSecond(seconds, nanoAdjustment), ZoneOffset.UTC); + } +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/LanceFileAppender.java b/lance/src/main/java/org/apache/iceberg/lance/LanceFileAppender.java new file mode 100644 index 000000000000..7013fd7d7ff3 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/LanceFileAppender.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.lance.file.LanceFileWriter; + +/** + * A {@link FileAppender} implementation that writes data to Lance files. + * + *

This appender bridges Iceberg's record-at-a-time {@code add(D)} interface with Lance's + * batch-oriented {@code write(VectorSchemaRoot)} API. Records are accumulated in an Arrow + * VectorSchemaRoot buffer and flushed to the Lance writer when the batch is full. + * + * @param the data type being written + */ +public class LanceFileAppender implements FileAppender { + private final LanceFileWriter writer; + private final BufferAllocator allocator; + private final Function> recordToMap; + private final org.apache.iceberg.Schema icebergSchema; + private final OutputFile outputFile; + private final int batchSize; + + private VectorSchemaRoot currentBatch; + private int currentRowIndex; + private long totalRecordCount; + private long estimatedFlushedBytes; + private boolean closed; + + LanceFileAppender( + String path, + OutputFile outputFile, + org.apache.iceberg.Schema icebergSchema, + Schema arrowSchema, + Function> recordToMap, + Map metadata, + Map storageOptions, + boolean overwrite, + int batchSize) + throws IOException { + this.allocator = new RootAllocator(Long.MAX_VALUE); + this.icebergSchema = icebergSchema; + this.outputFile = outputFile; + this.recordToMap = recordToMap; + this.batchSize = batchSize; + this.currentRowIndex = 0; + this.totalRecordCount = 0; + this.estimatedFlushedBytes = 0; + this.closed = false; + + DictionaryProvider dictProvider = new DictionaryProvider.MapDictionaryProvider(); + this.writer = LanceFileWriter.open(path, allocator, dictProvider, storageOptions); + + // Store Iceberg schema JSON and any user metadata in the Lance file + Map allMetadata = Maps.newHashMap(metadata); + allMetadata.put( + LanceSchemaUtil.ICEBERG_SCHEMA_KEY, org.apache.iceberg.SchemaParser.toJson(icebergSchema)); + writer.addSchemaMetadata(allMetadata); + + this.currentBatch = VectorSchemaRoot.create(arrowSchema, allocator); + this.currentBatch.allocateNew(); + } + + @Override + public void add(D datum) { + Map values = recordToMap.apply(datum); + LanceArrowConverter.writeRow(currentBatch, currentRowIndex, values, icebergSchema); + currentRowIndex++; + totalRecordCount++; + + if (currentRowIndex >= batchSize) { + flushBatch(); + } + } + + @Override + public Metrics metrics() { + return new Metrics(totalRecordCount, null, null, null, null); + } + + @Override + public long length() { + if (closed) { + return outputFile.toInputFile().getLength(); + } + + // Estimate: flushed bytes so far + current unflushed batch buffer sizes + long currentBatchBytes = 0; + for (FieldVector vector : currentBatch.getFieldVectors()) { + currentBatchBytes += vector.getBufferSize(); + } + return estimatedFlushedBytes + currentBatchBytes; + } + + @Override + public List splitOffsets() { + return null; + } + + @Override + public void close() throws IOException { + if (!closed) { + try { + if (currentRowIndex > 0) { + flushBatch(); + } else if (totalRecordCount == 0) { + // Lance requires at least one batch to establish the schema. + // Write an empty batch with row count 0. + currentBatch.setRowCount(0); + writer.write(currentBatch); + } + writer.close(); + } catch (Exception e) { + throw new IOException("Failed to close Lance writer", e); + } finally { + currentBatch.close(); + allocator.close(); + closed = true; + } + } + } + + private void flushBatch() { + try { + currentBatch.setRowCount(currentRowIndex); + // Accumulate buffer sizes before clearing for length() estimates + for (FieldVector vector : currentBatch.getFieldVectors()) { + estimatedFlushedBytes += vector.getBufferSize(); + } + writer.write(currentBatch); + currentBatch.clear(); + currentBatch.allocateNew(); + currentRowIndex = 0; + } catch (IOException e) { + throw new java.io.UncheckedIOException("Failed to flush batch to Lance writer", e); + } + } +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/LanceFormatModel.java b/lance/src/main/java/org/apache/iceberg/lance/LanceFormatModel.java new file mode 100644 index 000000000000..ce23b8b2b885 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/LanceFormatModel.java @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.deletes.PositionDelete; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.formats.BaseFormatModel; +import org.apache.iceberg.formats.ModelWriteBuilder; +import org.apache.iceberg.formats.ReadBuilder; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.mapping.NameMapping; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.lance.file.LanceFileReader; + +/** + * {@link org.apache.iceberg.formats.FormatModel} implementation for the Lance file format. + * + *

Lance is a columnar data format optimized for AI/ML workloads, featuring fast random access, + * native vector search, and an Arrow-native data model. This format model bridges Lance's + * file-level reader/writer APIs with Iceberg's FormatModel abstraction. + * + *

The file schema type {@code F} is {@link org.apache.arrow.vector.types.pojo.Schema} (Arrow + * Schema), since Lance is natively Arrow-based. + * + * @param the data type being read/written (e.g., Record for generic, InternalRow for Spark) + * @param the engine schema type (e.g., Void for generic) + */ +public class LanceFormatModel + extends BaseFormatModel< + D, + S, + Function>, + Function>, CloseableIterable>, + org.apache.arrow.vector.types.pojo.Schema> { + + private static final int DEFAULT_BATCH_SIZE = 1024; + + /** + * Creates a LanceFormatModel for position deletes. Position deletes use a fixed schema (file path + * + position) and the generic writer. + */ + @SuppressWarnings("unchecked") + public static LanceFormatModel, Void> forPositionDeletes() { + return new LanceFormatModel<>( + (Class>) (Class) PositionDelete.class, Void.class, null, null); + } + + /** + * Creates a LanceFormatModel for the given data and schema types. + * + * @param type the data type class + * @param schemaType the engine schema type class + * @param writerFunction function that converts data records to Maps for Arrow writing + * @param readerFunction function that creates a CloseableIterable from Arrow batches + */ + public static LanceFormatModel create( + Class type, + Class schemaType, + WriterFunction>, S, org.apache.arrow.vector.types.pojo.Schema> + writerFunction, + ReaderFunction< + Function>, CloseableIterable>, + S, + org.apache.arrow.vector.types.pojo.Schema> + readerFunction) { + return new LanceFormatModel<>(type, schemaType, writerFunction, readerFunction); + } + + private LanceFormatModel( + Class type, + Class schemaType, + WriterFunction>, S, org.apache.arrow.vector.types.pojo.Schema> + writerFunction, + ReaderFunction< + Function>, CloseableIterable>, + S, + org.apache.arrow.vector.types.pojo.Schema> + readerFunction) { + super(type, schemaType, writerFunction, readerFunction); + } + + @Override + public FileFormat format() { + return FileFormat.LANCE; + } + + @Override + public ModelWriteBuilder writeBuilder(EncryptedOutputFile outputFile) { + return new WriteBuilderWrapper<>(outputFile, writerFunction()); + } + + @Override + public ReadBuilder readBuilder(InputFile inputFile) { + return new ReadBuilderWrapper<>(inputFile, readerFunction()); + } + + // --------------------------------------------------------------------------- + // WriteBuilder + // --------------------------------------------------------------------------- + + private static class WriteBuilderWrapper implements ModelWriteBuilder { + private final EncryptedOutputFile outputFile; + private final WriterFunction< + Function>, S, org.apache.arrow.vector.types.pojo.Schema> + writerFunction; + private Schema schema; + private S engineSchema; + private FileContent content; + private final Map metadata = Maps.newHashMap(); + private final Map config = Maps.newHashMap(); + private boolean overwrite; + + WriteBuilderWrapper( + EncryptedOutputFile outputFile, + WriterFunction< + Function>, S, org.apache.arrow.vector.types.pojo.Schema> + writerFunction) { + this.outputFile = outputFile; + this.writerFunction = writerFunction; + } + + @Override + public ModelWriteBuilder schema(Schema newSchema) { + this.schema = newSchema; + return this; + } + + @Override + public ModelWriteBuilder engineSchema(S newSchema) { + this.engineSchema = newSchema; + return this; + } + + @Override + public ModelWriteBuilder set(String property, String value) { + config.put(property, value); + return this; + } + + @Override + public ModelWriteBuilder meta(String property, String value) { + metadata.put(property, value); + return this; + } + + @Override + public ModelWriteBuilder content(FileContent newContent) { + this.content = newContent; + return this; + } + + @Override + public ModelWriteBuilder metricsConfig(MetricsConfig metricsConfig) { + // Lance metrics are tracked in Java; MetricsConfig is noted but not yet used + return this; + } + + @Override + public ModelWriteBuilder overwrite() { + this.overwrite = true; + return this; + } + + @Override + public ModelWriteBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { + throw new UnsupportedOperationException("Lance does not support file encryption"); + } + + @Override + public ModelWriteBuilder withAADPrefix(ByteBuffer aadPrefix) { + throw new UnsupportedOperationException("Lance does not support file encryption"); + } + + @Override + public FileAppender build() throws IOException { + Schema writeSchema; + if (content == FileContent.POSITION_DELETES) { + writeSchema = DeleteSchemaUtil.pathPosSchema(); + } else { + writeSchema = schema; + } + + org.apache.arrow.vector.types.pojo.Schema arrowSchema = + LanceSchemaUtil.icebergToArrow(writeSchema); + + Function> recordConverter; + if (content == FileContent.POSITION_DELETES) { + recordConverter = positionDeleteConverter(); + } else { + recordConverter = writerFunction.write(writeSchema, arrowSchema, engineSchema); + } + + String path = outputFile.encryptingOutputFile().location(); + + // Extract storage options from config (s3.*, gcs.*, azure.* prefixed properties) + Map storageOptions = Maps.newHashMap(); + for (Map.Entry entry : config.entrySet()) { + String key = entry.getKey(); + if (key.startsWith("s3.") + || key.startsWith("gcs.") + || key.startsWith("azure.") + || key.startsWith("lance.storage.")) { + storageOptions.put(key, entry.getValue()); + } + } + + int batchSize = DEFAULT_BATCH_SIZE; + if (config.containsKey("lance.batch-size")) { + batchSize = Integer.parseInt(config.get("lance.batch-size")); + } + + return new LanceFileAppender<>( + path, + outputFile.encryptingOutputFile(), + writeSchema, + arrowSchema, + recordConverter, + metadata, + storageOptions, + overwrite, + batchSize); + } + + @SuppressWarnings("unchecked") + private Function> positionDeleteConverter() { + return datum -> { + PositionDelete posDelete = (PositionDelete) datum; + Map map = Maps.newHashMap(); + map.put("file_path", posDelete.path().toString()); + map.put("pos", posDelete.pos()); + return map; + }; + } + } + + // --------------------------------------------------------------------------- + // ReadBuilder + // --------------------------------------------------------------------------- + + private static class ReadBuilderWrapper implements ReadBuilder { + private final InputFile inputFile; + private final ReaderFunction< + Function>, CloseableIterable>, + S, + org.apache.arrow.vector.types.pojo.Schema> + readerFunction; + private Schema projectedSchema; + private S engineSchema; + private Map idToConstant = Collections.emptyMap(); + private int batchSize = DEFAULT_BATCH_SIZE; + private final Map config = Maps.newHashMap(); + + ReadBuilderWrapper( + InputFile inputFile, + ReaderFunction< + Function>, CloseableIterable>, + S, + org.apache.arrow.vector.types.pojo.Schema> + readerFunction) { + this.inputFile = inputFile; + this.readerFunction = readerFunction; + } + + @Override + public ReadBuilder split(long newStart, long newLength) { + // Lance does not support byte-range splits; ignore + return this; + } + + @Override + public ReadBuilder project(Schema schema) { + this.projectedSchema = schema; + return this; + } + + @Override + public ReadBuilder engineProjection(S schema) { + this.engineSchema = schema; + return this; + } + + @Override + public ReadBuilder caseSensitive(boolean caseSensitive) { + // Lance column names are case-sensitive by default; no toggle available + return this; + } + + @Override + public ReadBuilder filter(Expression filter) { + // No file-level filter pushdown in Lance file reader (only dataset scanner supports it) + return this; + } + + @Override + public ReadBuilder set(String key, String value) { + config.put(key, value); + return this; + } + + @Override + public ReadBuilder reuseContainers() { + // Arrow vectors naturally manage memory; no explicit reuse needed + return this; + } + + @Override + public ReadBuilder recordsPerBatch(int numRowsPerBatch) { + this.batchSize = numRowsPerBatch; + return this; + } + + @Override + public ReadBuilder idToConstant(Map newIdToConstant) { + this.idToConstant = newIdToConstant; + return this; + } + + @Override + public ReadBuilder withNameMapping(NameMapping nameMapping) { + // Name mapping not yet supported for Lance + return this; + } + + @Override + public CloseableIterable build() { + String path = inputFile.location(); + + // Extract storage options + Map storageOptions = Maps.newHashMap(); + for (Map.Entry entry : config.entrySet()) { + String key = entry.getKey(); + if (key.startsWith("s3.") + || key.startsWith("gcs.") + || key.startsWith("azure.") + || key.startsWith("lance.storage.")) { + storageOptions.put(key, entry.getValue()); + } + } + + List projectedColumns = null; + org.apache.arrow.vector.types.pojo.Schema arrowSchema = null; + if (projectedSchema != null) { + projectedColumns = LanceSchemaUtil.columnNames(projectedSchema); + arrowSchema = LanceSchemaUtil.icebergToArrow(projectedSchema); + } + + final List columns = projectedColumns; + final org.apache.arrow.vector.types.pojo.Schema finalArrowSchema = arrowSchema; + final Schema readSchema = projectedSchema; + final Map constants = idToConstant; + final int readBatchSize = batchSize; + + Function>, CloseableIterable> readerFunc = + readerFunction.read(readSchema, finalArrowSchema, engineSchema, constants); + + return new CloseableIterable() { + @Override + public CloseableIterator iterator() { + try { + return new LanceCloseableIterator<>( + path, storageOptions, columns, readBatchSize, constants, readerFunc); + } catch (IOException e) { + throw new java.io.UncheckedIOException("Failed to open Lance file reader", e); + } + } + + @Override + public void close() throws IOException { + // Iterator manages its own resources + } + }; + } + } + + // --------------------------------------------------------------------------- + // Iterator + // --------------------------------------------------------------------------- + + private static class LanceCloseableIterator implements CloseableIterator { + private final BufferAllocator allocator; + private final LanceFileReader fileReader; + private final ArrowReader arrowReader; + private final Map idToConstant; + private final Function>, CloseableIterable> + readerFunc; + + private CloseableIterator currentBatchIterator; + private boolean finished; + + LanceCloseableIterator( + String path, + Map storageOptions, + List columns, + int batchSize, + Map idToConstant, + Function>, CloseableIterable> readerFunc) + throws IOException { + this.allocator = new RootAllocator(Long.MAX_VALUE); + this.idToConstant = idToConstant; + this.readerFunc = readerFunc; + this.finished = false; + + this.fileReader = LanceFileReader.open(path, storageOptions, allocator); + this.arrowReader = fileReader.readAll(columns, null, batchSize); + } + + @Override + public boolean hasNext() { + if (finished) { + return false; + } + + if (currentBatchIterator != null && currentBatchIterator.hasNext()) { + return true; + } + + // Try to load next batch + try { + if (arrowReader.loadNextBatch()) { + VectorSchemaRoot batch = arrowReader.getVectorSchemaRoot(); + CloseableIterable records = + readerFunc.apply(new java.util.AbstractMap.SimpleEntry<>(batch, idToConstant)); + currentBatchIterator = records.iterator(); + return currentBatchIterator.hasNext(); + } else { + finished = true; + return false; + } + } catch (IOException e) { + throw new java.io.UncheckedIOException("Failed to load next batch from Lance reader", e); + } + } + + @Override + public D next() { + return currentBatchIterator.next(); + } + + @Override + public void close() throws IOException { + try { + if (currentBatchIterator != null) { + currentBatchIterator.close(); + } + arrowReader.close(); + fileReader.close(); + } catch (Exception e) { + throw new IOException("Failed to close Lance reader", e); + } finally { + allocator.close(); + } + } + } +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/LanceFormatModels.java b/lance/src/main/java/org/apache/iceberg/lance/LanceFormatModels.java new file mode 100644 index 000000000000..9860bb583cd0 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/LanceFormatModels.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +import org.apache.iceberg.data.Record; +import org.apache.iceberg.formats.FormatModelRegistry; + +/** + * Registration entry point for Lance format models. + * + *

This class is auto-discovered by {@link FormatModelRegistry} via reflection when the + * iceberg-lance jar is on the classpath. The {@code register()} method is called during static + * initialization of the registry. + */ +public class LanceFormatModels { + + /** + * Registers Lance format models for the generic Record data model. + * + *

Registers two models: + * + *

    + *
  • A data/equality-delete model using {@link Record} as the data type + *
  • A position-delete model using {@link org.apache.iceberg.deletes.PositionDelete} + *
+ */ + public static void register() { + FormatModelRegistry.register( + LanceFormatModel.create( + Record.class, + Void.class, + (icebergSchema, arrowSchema, engineSchema) -> + GenericLanceWriter.create(icebergSchema, arrowSchema), + (icebergSchema, arrowSchema, engineSchema, idToConstant) -> + GenericLanceReader.buildReader(icebergSchema, arrowSchema, idToConstant))); + + FormatModelRegistry.register(LanceFormatModel.forPositionDeletes()); + + // Register Spark-specific format models when Spark is on the classpath + try { + Class.forName("org.apache.spark.sql.catalyst.InternalRow"); + org.apache.iceberg.lance.spark.SparkLanceFormatModels.register(); + } catch (ClassNotFoundException e) { + // Spark not on classpath, skip Spark registrations + } + } + + private LanceFormatModels() {} +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/LanceSchemaUtil.java b/lance/src/main/java/org/apache/iceberg/lance/LanceSchemaUtil.java new file mode 100644 index 000000000000..d5732227b948 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/LanceSchemaUtil.java @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +import java.util.List; +import java.util.Map; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; + +/** + * Utilities for converting between Iceberg schemas and Arrow schemas used by Lance. + * + *

Lance is natively Arrow-based, so the "file schema" type for Lance is Arrow's {@link Schema}. + * This util handles bidirectional conversion, storing Iceberg field IDs in Arrow field metadata so + * they survive round-trips. + */ +public class LanceSchemaUtil { + static final String ICEBERG_FIELD_ID = "PARQUET:field_id"; + static final String ICEBERG_SCHEMA_KEY = "iceberg.schema"; + + private LanceSchemaUtil() {} + + /** Convert an Iceberg schema to an Arrow schema, preserving field IDs in metadata. */ + public static Schema icebergToArrow(org.apache.iceberg.Schema icebergSchema) { + List fields = Lists.newArrayList(); + for (Types.NestedField column : icebergSchema.columns()) { + fields.add(convertField(column)); + } + return new Schema(fields); + } + + /** Convert an Arrow schema back to an Iceberg schema, recovering field IDs from metadata. */ + public static org.apache.iceberg.Schema arrowToIceberg(Schema arrowSchema) { + List columns = Lists.newArrayList(); + for (Field field : arrowSchema.getFields()) { + columns.add(convertToIcebergField(field)); + } + return new org.apache.iceberg.Schema(columns); + } + + /** Extract projected column names from an Iceberg schema. */ + public static List columnNames(org.apache.iceberg.Schema schema) { + List names = Lists.newArrayList(); + for (Types.NestedField field : schema.columns()) { + names.add(field.name()); + } + return names; + } + + private static Field convertField(Types.NestedField icebergField) { + Map metadata = Maps.newHashMap(); + metadata.put(ICEBERG_FIELD_ID, String.valueOf(icebergField.fieldId())); + + ArrowType arrowType = toArrowType(icebergField.type()); + List children = Lists.newArrayList(); + + if (icebergField.type().isStructType()) { + for (Types.NestedField child : icebergField.type().asStructType().fields()) { + children.add(convertField(child)); + } + } else if (icebergField.type().isListType()) { + Types.ListType listType = icebergField.type().asListType(); + Types.NestedField elementField = listType.fields().get(0); + children.add(convertField(elementField)); + } else if (icebergField.type().isMapType()) { + Types.MapType mapType = icebergField.type().asMapType(); + Field keyField = convertField(mapType.fields().get(0)); + Field valueField = convertField(mapType.fields().get(1)); + List entryChildren = Lists.newArrayList(); + entryChildren.add(keyField); + entryChildren.add(valueField); + Field entriesField = + new Field( + "entries", new FieldType(false, ArrowType.Struct.INSTANCE, null), entryChildren); + children.add(entriesField); + } + + return new Field( + icebergField.name(), + new FieldType(icebergField.isOptional(), arrowType, null, metadata), + children); + } + + private static ArrowType toArrowType(Type type) { + if (type.isStructType()) { + return ArrowType.Struct.INSTANCE; + } else if (type.isListType()) { + return ArrowType.List.INSTANCE; + } else if (type.isMapType()) { + return new ArrowType.Map(false); + } + + Type.PrimitiveType primitive = type.asPrimitiveType(); + switch (primitive.typeId()) { + case BOOLEAN: + return ArrowType.Bool.INSTANCE; + case INTEGER: + return new ArrowType.Int(32, true); + case LONG: + return new ArrowType.Int(64, true); + case FLOAT: + return new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE); + case DOUBLE: + return new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE); + case DATE: + return new ArrowType.Date(DateUnit.DAY); + case TIME: + return new ArrowType.Time(TimeUnit.MICROSECOND, 64); + case TIMESTAMP: + Types.TimestampType tsType = (Types.TimestampType) primitive; + return new ArrowType.Timestamp( + TimeUnit.MICROSECOND, tsType.shouldAdjustToUTC() ? "UTC" : null); + case TIMESTAMP_NANO: + Types.TimestampNanoType tsNanoType = (Types.TimestampNanoType) primitive; + return new ArrowType.Timestamp( + TimeUnit.NANOSECOND, tsNanoType.shouldAdjustToUTC() ? "UTC" : null); + case STRING: + return ArrowType.Utf8.INSTANCE; + case UUID: + return new ArrowType.FixedSizeBinary(16); + case FIXED: + Types.FixedType fixedType = (Types.FixedType) primitive; + return new ArrowType.FixedSizeBinary(fixedType.length()); + case BINARY: + return ArrowType.Binary.INSTANCE; + case DECIMAL: + Types.DecimalType decimalType = (Types.DecimalType) primitive; + return new ArrowType.Decimal(decimalType.precision(), decimalType.scale(), 128); + default: + throw new UnsupportedOperationException("Unsupported Iceberg type: " + primitive); + } + } + + private static Types.NestedField convertToIcebergField(Field arrowField) { + int fieldId = extractFieldId(arrowField); + boolean isOptional = arrowField.isNullable(); + String name = arrowField.getName(); + Type icebergType = toIcebergType(arrowField); + + if (isOptional) { + return Types.NestedField.optional(fieldId, name, icebergType); + } else { + return Types.NestedField.required(fieldId, name, icebergType); + } + } + + private static int extractFieldId(Field field) { + Map metadata = field.getMetadata(); + if (metadata != null && metadata.containsKey(ICEBERG_FIELD_ID)) { + return Integer.parseInt(metadata.get(ICEBERG_FIELD_ID)); + } + // Fallback: assign a negative field ID (will need proper resolution via name mapping) + return -1; + } + + private static Type toIcebergType(Field arrowField) { + ArrowType arrowType = arrowField.getType(); + + if (arrowType instanceof ArrowType.Struct) { + List fields = Lists.newArrayList(); + for (Field child : arrowField.getChildren()) { + fields.add(convertToIcebergField(child)); + } + return Types.StructType.of(fields); + } + + if (arrowType instanceof ArrowType.List) { + Preconditions.checkArgument( + arrowField.getChildren().size() == 1, "List type must have exactly one child"); + Field elementField = arrowField.getChildren().get(0); + Types.NestedField element = convertToIcebergField(elementField); + if (element.isOptional()) { + return Types.ListType.ofOptional(element.fieldId(), element.type()); + } else { + return Types.ListType.ofRequired(element.fieldId(), element.type()); + } + } + + if (arrowType instanceof ArrowType.Map) { + Preconditions.checkArgument( + arrowField.getChildren().size() == 1, "Map type must have entries child"); + Field entriesField = arrowField.getChildren().get(0); + Preconditions.checkArgument( + entriesField.getChildren().size() == 2, "Map entries must have key and value"); + Types.NestedField keyField = convertToIcebergField(entriesField.getChildren().get(0)); + Types.NestedField valueField = convertToIcebergField(entriesField.getChildren().get(1)); + if (valueField.isOptional()) { + return Types.MapType.ofOptional( + keyField.fieldId(), valueField.fieldId(), keyField.type(), valueField.type()); + } else { + return Types.MapType.ofRequired( + keyField.fieldId(), valueField.fieldId(), keyField.type(), valueField.type()); + } + } + + return toIcebergPrimitive(arrowType); + } + + private static Type toIcebergPrimitive(ArrowType arrowType) { + if (arrowType instanceof ArrowType.Bool) { + return Types.BooleanType.get(); + } else if (arrowType instanceof ArrowType.Int) { + ArrowType.Int intType = (ArrowType.Int) arrowType; + return intType.getBitWidth() <= 32 ? Types.IntegerType.get() : Types.LongType.get(); + } else if (arrowType instanceof ArrowType.FloatingPoint) { + ArrowType.FloatingPoint fpType = (ArrowType.FloatingPoint) arrowType; + return fpType.getPrecision() == FloatingPointPrecision.SINGLE + ? Types.FloatType.get() + : Types.DoubleType.get(); + } else if (arrowType instanceof ArrowType.Decimal) { + ArrowType.Decimal decType = (ArrowType.Decimal) arrowType; + return Types.DecimalType.of(decType.getPrecision(), decType.getScale()); + } else if (arrowType instanceof ArrowType.Utf8) { + return Types.StringType.get(); + } else if (arrowType instanceof ArrowType.Binary) { + return Types.BinaryType.get(); + } else { + return toIcebergTemporalOrFixedType(arrowType); + } + } + + private static Type toIcebergTemporalOrFixedType(ArrowType arrowType) { + if (arrowType instanceof ArrowType.FixedSizeBinary) { + ArrowType.FixedSizeBinary fsbType = (ArrowType.FixedSizeBinary) arrowType; + return fsbType.getByteWidth() == 16 + ? Types.UUIDType.get() + : Types.FixedType.ofLength(fsbType.getByteWidth()); + } else if (arrowType instanceof ArrowType.Date) { + return Types.DateType.get(); + } else if (arrowType instanceof ArrowType.Time) { + return Types.TimeType.get(); + } else if (arrowType instanceof ArrowType.Timestamp) { + ArrowType.Timestamp tsType = (ArrowType.Timestamp) arrowType; + if (tsType.getUnit() == TimeUnit.NANOSECOND) { + return Types.TimestampNanoType.withZone(); + } + boolean adjustToUTC = tsType.getTimezone() != null; + return adjustToUTC ? Types.TimestampType.withZone() : Types.TimestampType.withoutZone(); + } else { + throw new UnsupportedOperationException("Unsupported Arrow type: " + arrowType); + } + } +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/spark/ConstantColumnVector.java b/lance/src/main/java/org/apache/iceberg/lance/spark/ConstantColumnVector.java new file mode 100644 index 000000000000..f1139b119c75 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/spark/ConstantColumnVector.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance.spark; + +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarArray; +import org.apache.spark.sql.vectorized.ColumnarMap; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * A ColumnVector that returns a constant value for every row. Used for partition columns and + * metadata columns that have a fixed value across an entire file/batch. + */ +class ConstantColumnVector extends ColumnVector { + private final Object value; + private final int numRows; + private final boolean isNull; + + ConstantColumnVector(DataType sparkType, int numRows, Object value) { + super(sparkType); + this.numRows = numRows; + this.value = value; + this.isNull = (value == null); + } + + @Override + public void close() {} + + @Override + public boolean hasNull() { + return isNull; + } + + @Override + public int numNulls() { + return isNull ? numRows : 0; + } + + @Override + public boolean isNullAt(int rowId) { + return isNull; + } + + @Override + public boolean getBoolean(int rowId) { + return (Boolean) value; + } + + @Override + public byte getByte(int rowId) { + return ((Number) value).byteValue(); + } + + @Override + public short getShort(int rowId) { + return ((Number) value).shortValue(); + } + + @Override + public int getInt(int rowId) { + return ((Number) value).intValue(); + } + + @Override + public long getLong(int rowId) { + return ((Number) value).longValue(); + } + + @Override + public float getFloat(int rowId) { + return ((Number) value).floatValue(); + } + + @Override + public double getDouble(int rowId) { + return ((Number) value).doubleValue(); + } + + @Override + public Decimal getDecimal(int rowId, int precision, int scale) { + return Decimal.apply((java.math.BigDecimal) value, precision, scale); + } + + @Override + public UTF8String getUTF8String(int rowId) { + return UTF8String.fromString(value.toString()); + } + + @Override + public byte[] getBinary(int rowId) { + if (value instanceof byte[]) { + return (byte[]) value; + } + return null; + } + + @Override + public ColumnarArray getArray(int rowId) { + throw new UnsupportedOperationException("Array constants not supported"); + } + + @Override + public ColumnarMap getMap(int rowId) { + throw new UnsupportedOperationException("Map constants not supported"); + } + + @Override + public ColumnVector getChild(int ordinal) { + throw new UnsupportedOperationException("Struct constants not supported"); + } +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/spark/IcebergTypeToSparkType.java b/lance/src/main/java/org/apache/iceberg/lance/spark/IcebergTypeToSparkType.java new file mode 100644 index 000000000000..c904ee21ae6c --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/spark/IcebergTypeToSparkType.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance.spark; + +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.types.DataTypes; + +/** Converts Iceberg types to Spark DataTypes. Minimal subset needed for ConstantColumnVector. */ +class IcebergTypeToSparkType { + + private IcebergTypeToSparkType() {} + + static DataType convert(Type type) { + switch (type.typeId()) { + case BOOLEAN: + return DataTypes.BooleanType; + case INTEGER: + return DataTypes.IntegerType; + case LONG: + return DataTypes.LongType; + case FLOAT: + return DataTypes.FloatType; + case DOUBLE: + return DataTypes.DoubleType; + case DATE: + return DataTypes.DateType; + case TIME: + return DataTypes.LongType; + case TIMESTAMP: + return DataTypes.TimestampType; + case STRING: + return DataTypes.StringType; + case BINARY: + return DataTypes.BinaryType; + case UUID: + return DataTypes.BinaryType; + case FIXED: + return DataTypes.BinaryType; + case DECIMAL: + Types.DecimalType dec = (Types.DecimalType) type; + return DataTypes.createDecimalType(dec.precision(), dec.scale()); + default: + throw new UnsupportedOperationException("Unsupported Iceberg type for Spark: " + type); + } + } +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/spark/SparkLanceColumnarReader.java b/lance/src/main/java/org/apache/iceberg/lance/spark/SparkLanceColumnarReader.java new file mode 100644 index 000000000000..72ba3a5af2a5 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/spark/SparkLanceColumnarReader.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance.spark; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.types.DataType; +import org.apache.spark.sql.vectorized.ArrowColumnVector; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +/** + * Converts Arrow VectorSchemaRoot batches from Lance into Spark ColumnarBatch. + * + *

This is a near-zero-copy path: Arrow FieldVectors are wrapped directly in ArrowColumnVector + * without data copying. + */ +public class SparkLanceColumnarReader { + + private SparkLanceColumnarReader() {} + + /** + * Builds a reader function that converts Arrow batches to ColumnarBatch. + * + * @param expectedSchema the Iceberg schema defining expected columns + * @param idToConstant constant values for partition/metadata columns + * @return a function that converts (VectorSchemaRoot, idToConstant) → CloseableIterable + */ + public static Function< + Map.Entry>, CloseableIterable> + buildReader(Schema expectedSchema, Map idToConstant) { + return entry -> { + VectorSchemaRoot batch = entry.getKey(); + int rowCount = batch.getRowCount(); + + List columns = Lists.newArrayList(); + for (Types.NestedField field : expectedSchema.columns()) { + DataType sparkType = IcebergTypeToSparkType.convert(field.type()); + if (idToConstant != null && idToConstant.containsKey(field.fieldId())) { + columns.add( + new ConstantColumnVector(sparkType, rowCount, idToConstant.get(field.fieldId()))); + } else { + FieldVector arrowVector = batch.getVector(field.name()); + if (arrowVector != null) { + columns.add(new ArrowColumnVector(arrowVector)); + } else { + columns.add(new ConstantColumnVector(sparkType, rowCount, null)); + } + } + } + + ColumnarBatch columnarBatch = new ColumnarBatch(columns.toArray(new ColumnVector[0])); + columnarBatch.setNumRows(rowCount); + + return new CloseableIterable() { + @Override + public CloseableIterator iterator() { + return CloseableIterator.withClose(Collections.singletonList(columnarBatch).iterator()); + } + + @Override + public void close() throws IOException { + // batch lifecycle managed by the LanceCloseableIterator + } + }; + }; + } +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/spark/SparkLanceFormatModels.java b/lance/src/main/java/org/apache/iceberg/lance/spark/SparkLanceFormatModels.java new file mode 100644 index 000000000000..2edf85c2a631 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/spark/SparkLanceFormatModels.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance.spark; + +import org.apache.iceberg.formats.FormatModelRegistry; +import org.apache.iceberg.lance.LanceFormatModel; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +/** + * Registers Lance format models for Spark's InternalRow and ColumnarBatch data types. + * + *

This class is auto-discovered by {@link FormatModelRegistry} via reflection when the + * iceberg-spark jar and iceberg-lance jar are both on the classpath. + */ +public class SparkLanceFormatModels { + public static void register() { + // ColumnarBatch — vectorized, near-zero-copy via ArrowColumnVector + FormatModelRegistry.register( + LanceFormatModel.create( + ColumnarBatch.class, + StructType.class, + null, // no writer for ColumnarBatch (read-only vectorized path) + (icebergSchema, arrowSchema, engineSchema, idToConstant) -> + SparkLanceColumnarReader.buildReader(icebergSchema, idToConstant))); + + // InternalRow — row-based reads and writes + FormatModelRegistry.register( + LanceFormatModel.create( + InternalRow.class, + StructType.class, + (icebergSchema, arrowSchema, engineSchema) -> + SparkLanceWriter.buildWriter(icebergSchema), + (icebergSchema, arrowSchema, engineSchema, idToConstant) -> + SparkLanceRowReader.buildReader(icebergSchema, idToConstant))); + } + + private SparkLanceFormatModels() {} +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/spark/SparkLanceRowReader.java b/lance/src/main/java/org/apache/iceberg/lance/spark/SparkLanceRowReader.java new file mode 100644 index 000000000000..b60fdc18ff86 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/spark/SparkLanceRowReader.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance.spark; + +import java.io.IOException; +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Function; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.FixedSizeBinaryVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.TimeMicroVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * Converts Arrow VectorSchemaRoot batches from Lance into Spark InternalRow objects. + * + *

Iterates rows within each batch, extracting values from Arrow vectors and converting them to + * Spark internal types. + */ +public class SparkLanceRowReader { + + private SparkLanceRowReader() {} + + /** + * Builds a reader function that converts Arrow batches to InternalRow. + * + * @param expectedSchema the Iceberg schema defining expected columns + * @param idToConstant constant values for partition/metadata columns + * @return a function that converts (VectorSchemaRoot, idToConstant) → CloseableIterable + */ + public static Function< + Map.Entry>, CloseableIterable> + buildReader(Schema expectedSchema, Map idToConstant) { + return entry -> { + VectorSchemaRoot batch = entry.getKey(); + Map constants = entry.getValue(); + int rowCount = batch.getRowCount(); + + List rows = + org.apache.iceberg.relocated.com.google.common.collect.Lists.newArrayListWithCapacity( + rowCount); + List columns = expectedSchema.columns(); + int numCols = columns.size(); + + for (int rowIdx = 0; rowIdx < rowCount; rowIdx++) { + Object[] values = new Object[numCols]; + for (int colIdx = 0; colIdx < numCols; colIdx++) { + Types.NestedField field = columns.get(colIdx); + + if (constants != null && constants.containsKey(field.fieldId())) { + values[colIdx] = convertToSparkConstant(constants.get(field.fieldId()), field); + } else { + FieldVector vector = batch.getVector(field.name()); + if (vector == null || vector.isNull(rowIdx)) { + values[colIdx] = null; + } else { + values[colIdx] = readSparkValue(vector, rowIdx, field); + } + } + } + rows.add(new GenericInternalRow(values)); + } + + return new CloseableIterable() { + @Override + public CloseableIterator iterator() { + return CloseableIterator.withClose(rows.iterator()); + } + + @Override + public void close() throws IOException {} + }; + }; + } + + private static Object readSparkValue(FieldVector vector, int rowIdx, Types.NestedField field) { + switch (field.type().typeId()) { + case BOOLEAN: + return ((BitVector) vector).get(rowIdx) == 1; + case INTEGER: + return ((IntVector) vector).get(rowIdx); + case LONG: + return ((BigIntVector) vector).get(rowIdx); + case FLOAT: + return ((Float4Vector) vector).get(rowIdx); + case DOUBLE: + return ((Float8Vector) vector).get(rowIdx); + case DATE: + return ((DateDayVector) vector).get(rowIdx); + case TIME: + return ((TimeMicroVector) vector).get(rowIdx); + case TIMESTAMP: + if (((Types.TimestampType) field.type()).shouldAdjustToUTC()) { + return ((TimeStampMicroTZVector) vector).get(rowIdx); + } else { + return ((TimeStampMicroVector) vector).get(rowIdx); + } + case STRING: + byte[] strBytes = ((VarCharVector) vector).get(rowIdx); + return UTF8String.fromBytes(strBytes); + case UUID: + byte[] uuidBytes = ((FixedSizeBinaryVector) vector).get(rowIdx); + return uuidBytes; + case FIXED: + return ((FixedSizeBinaryVector) vector).get(rowIdx); + case BINARY: + return ((VarBinaryVector) vector).get(rowIdx); + case DECIMAL: + BigDecimal bd = ((DecimalVector) vector).getObject(rowIdx); + Types.DecimalType decType = (Types.DecimalType) field.type(); + return Decimal.apply(bd, decType.precision(), decType.scale()); + default: + throw new UnsupportedOperationException( + "Unsupported type for Spark row reader: " + field.type().typeId()); + } + } + + private static Object convertToSparkConstant(Object value, Types.NestedField field) { + if (value == null) { + return null; + } + switch (field.type().typeId()) { + case STRING: + return UTF8String.fromString(value.toString()); + case DECIMAL: + Types.DecimalType decType = (Types.DecimalType) field.type(); + return Decimal.apply((BigDecimal) value, decType.precision(), decType.scale()); + default: + return value; + } + } +} diff --git a/lance/src/main/java/org/apache/iceberg/lance/spark/SparkLanceWriter.java b/lance/src/main/java/org/apache/iceberg/lance/spark/SparkLanceWriter.java new file mode 100644 index 000000000000..c2968e2c8ef7 --- /dev/null +++ b/lance/src/main/java/org/apache/iceberg/lance/spark/SparkLanceWriter.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance.spark; + +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.Decimal; +import org.apache.spark.unsafe.types.UTF8String; + +/** + * Converts Spark InternalRow to a Map of column name → value for writing via Lance's Arrow-based + * writer. + */ +public class SparkLanceWriter { + + private SparkLanceWriter() {} + + /** + * Builds a writer function that converts InternalRow to a Map. + * + * @param icebergSchema the Iceberg schema defining the columns + * @return a function that extracts values from InternalRow into a name→value Map + */ + public static Function> buildWriter(Schema icebergSchema) { + List columns = icebergSchema.columns(); + return row -> { + Map values = Maps.newHashMapWithExpectedSize(columns.size()); + for (int i = 0; i < columns.size(); i++) { + Types.NestedField field = columns.get(i); + if (row.isNullAt(i)) { + values.put(field.name(), null); + } else { + values.put(field.name(), extractValue(row, i, field)); + } + } + return values; + }; + } + + private static Object extractValue(InternalRow row, int ordinal, Types.NestedField field) { + switch (field.type().typeId()) { + case BOOLEAN: + return row.getBoolean(ordinal); + case INTEGER: + return row.getInt(ordinal); + case LONG: + return row.getLong(ordinal); + case FLOAT: + return row.getFloat(ordinal); + case DOUBLE: + return row.getDouble(ordinal); + case DATE: + // Spark stores dates as int days since epoch + return row.getInt(ordinal); + case TIME: + return row.getLong(ordinal); + case TIMESTAMP: + // Spark stores timestamps as long microseconds since epoch + return row.getLong(ordinal); + case STRING: + UTF8String utf8 = row.getUTF8String(ordinal); + return utf8 != null ? utf8.toString() : null; + case UUID: + return row.getBinary(ordinal); + case FIXED: + return row.getBinary(ordinal); + case BINARY: + return row.getBinary(ordinal); + case DECIMAL: + Types.DecimalType decType = (Types.DecimalType) field.type(); + Decimal decimal = row.getDecimal(ordinal, decType.precision(), decType.scale()); + return decimal != null ? decimal.toJavaBigDecimal() : null; + default: + throw new UnsupportedOperationException( + "Unsupported type for Spark writer: " + field.type().typeId()); + } + } +} diff --git a/lance/src/test/java/org/apache/iceberg/lance/TestLanceFormatModel.java b/lance/src/test/java/org/apache/iceberg/lance/TestLanceFormatModel.java new file mode 100644 index 000000000000..b369908a76d8 --- /dev/null +++ b/lance/src/test/java/org/apache/iceberg/lance/TestLanceFormatModel.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.nio.ByteBuffer; +import java.nio.file.Path; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.formats.ModelWriteBuilder; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Tests for the LanceFormatModel. + * + *

Note: Full round-trip tests require the Lance JNI native library to be available. These tests + * validate the model configuration and builder wiring without requiring native code. + */ +class TestLanceFormatModel { + + @TempDir Path tempDir; + + @Test + void testFormatReturnsLance() { + LanceFormatModel model = + LanceFormatModel.create( + Record.class, + Void.class, + (icebergSchema, arrowSchema, engineSchema) -> + GenericLanceWriter.create(icebergSchema, arrowSchema), + (icebergSchema, arrowSchema, engineSchema, idToConstant) -> + GenericLanceReader.buildReader(icebergSchema, arrowSchema, idToConstant)); + + assertThat(model.format()).isEqualTo(FileFormat.LANCE); + } + + @Test + void testTypeAndSchemaType() { + LanceFormatModel model = + LanceFormatModel.create( + Record.class, + Void.class, + (icebergSchema, arrowSchema, engineSchema) -> + GenericLanceWriter.create(icebergSchema, arrowSchema), + (icebergSchema, arrowSchema, engineSchema, idToConstant) -> + GenericLanceReader.buildReader(icebergSchema, arrowSchema, idToConstant)); + + assertThat(model.type()).isEqualTo(Record.class); + assertThat(model.schemaType()).isEqualTo(Void.class); + } + + @Test + void testPositionDeleteModel() { + LanceFormatModel model = LanceFormatModel.forPositionDeletes(); + + assertThat(model.format()).isEqualTo(FileFormat.LANCE); + } + + @Test + void testWriteBuilderEncryptionThrows() { + LanceFormatModel model = + LanceFormatModel.create( + Record.class, + Void.class, + (icebergSchema, arrowSchema, engineSchema) -> + GenericLanceWriter.create(icebergSchema, arrowSchema), + (icebergSchema, arrowSchema, engineSchema, idToConstant) -> + GenericLanceReader.buildReader(icebergSchema, arrowSchema, idToConstant)); + + EncryptedOutputFile encryptedOutput = + EncryptedFiles.plainAsEncryptedOutput( + Files.localOutput(tempDir.resolve("enc-test.lance").toFile())); + + ModelWriteBuilder builder = model.writeBuilder(encryptedOutput); + + assertThatThrownBy(() -> builder.withFileEncryptionKey(ByteBuffer.allocate(16))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Lance does not support file encryption"); + + assertThatThrownBy(() -> builder.withAADPrefix(ByteBuffer.allocate(16))) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessage("Lance does not support file encryption"); + } +} diff --git a/lance/src/test/java/org/apache/iceberg/lance/TestLanceRoundTrip.java b/lance/src/test/java/org/apache/iceberg/lance/TestLanceRoundTrip.java new file mode 100644 index 000000000000..e9f6592bf3c6 --- /dev/null +++ b/lance/src/test/java/org/apache/iceberg/lance/TestLanceRoundTrip.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.encryption.EncryptedFiles; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.formats.ModelWriteBuilder; +import org.apache.iceberg.formats.ReadBuilder; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +/** + * Integration tests for Lance format round-trip read/write via LanceFormatModel. + * + *

Requires the Lance JNI native library to be available on the classpath (built from + * lance/java/lance-jni). + */ +class TestLanceRoundTrip { + + @TempDir Path tempDir; + + private static final Schema SIMPLE_SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get()), + Types.NestedField.required(3, "age", Types.IntegerType.get()), + Types.NestedField.optional(4, "score", Types.DoubleType.get())); + + private LanceFormatModel createModel() { + return LanceFormatModel.create( + Record.class, + Void.class, + (icebergSchema, arrowSchema, engineSchema) -> + GenericLanceWriter.create(icebergSchema, arrowSchema), + (icebergSchema, arrowSchema, engineSchema, idToConstant) -> + GenericLanceReader.buildReader(icebergSchema, arrowSchema, idToConstant)); + } + + private File lanceFile(String name) { + return tempDir.resolve(name + ".lance").toFile(); + } + + private List writeRecords( + LanceFormatModel model, File file, Schema schema, List records) + throws IOException { + EncryptedOutputFile encryptedOutput = + EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(file)); + + ModelWriteBuilder writeBuilder = + model.writeBuilder(encryptedOutput).schema(schema).content(FileContent.DATA); + + try (FileAppender appender = writeBuilder.build()) { + for (Record record : records) { + appender.add(record); + } + } + return records; + } + + private List readRecords( + LanceFormatModel model, File file, Schema projectedSchema) throws IOException { + InputFile inputFile = Files.localInput(file); + ReadBuilder readBuilder = model.readBuilder(inputFile); + + if (projectedSchema != null) { + readBuilder.project(projectedSchema); + } + + List result = Lists.newArrayList(); + try (CloseableIterable iterable = readBuilder.build()) { + for (Record record : iterable) { + result.add(record); + } + } + return result; + } + + @Test + void testBasicPrimitiveRoundTrip() throws IOException { + LanceFormatModel model = createModel(); + File file = lanceFile("basic-roundtrip"); + + List written = Lists.newArrayList(); + for (int i = 0; i < 100; i++) { + GenericRecord record = GenericRecord.create(SIMPLE_SCHEMA); + record.setField("id", i); + record.setField("name", "user_" + i); + record.setField("age", 20 + (i % 50)); + record.setField("score", i * 1.5); + written.add(record); + } + + writeRecords(model, file, SIMPLE_SCHEMA, written); + List read = readRecords(model, file, SIMPLE_SCHEMA); + + assertThat(read).hasSize(100); + for (int i = 0; i < 100; i++) { + assertThat(read.get(i).getField("id")).isEqualTo(written.get(i).getField("id")); + assertThat(read.get(i).getField("name")).isEqualTo(written.get(i).getField("name")); + assertThat(read.get(i).getField("age")).isEqualTo(written.get(i).getField("age")); + assertThat(read.get(i).getField("score")).isEqualTo(written.get(i).getField("score")); + } + } + + @Test + void testEmptyFile() throws IOException { + LanceFormatModel model = createModel(); + File file = lanceFile("empty"); + + writeRecords(model, file, SIMPLE_SCHEMA, Lists.newArrayList()); + List read = readRecords(model, file, SIMPLE_SCHEMA); + + assertThat(read).isEmpty(); + } + + @Test + void testNullHandling() throws IOException { + LanceFormatModel model = createModel(); + File file = lanceFile("nulls"); + + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get()), + Types.NestedField.optional(3, "value", Types.DoubleType.get())); + + List written = Lists.newArrayList(); + for (int i = 0; i < 50; i++) { + GenericRecord record = GenericRecord.create(schema); + record.setField("id", i); + record.setField("name", i % 3 == 0 ? null : "name_" + i); + record.setField("value", i % 5 == 0 ? null : i * 2.0); + written.add(record); + } + + writeRecords(model, file, schema, written); + List read = readRecords(model, file, schema); + + assertThat(read).hasSize(50); + for (int i = 0; i < 50; i++) { + assertThat(read.get(i).getField("id")).isEqualTo(i); + if (i % 3 == 0) { + assertThat(read.get(i).getField("name")).isNull(); + } else { + assertThat(read.get(i).getField("name")).isEqualTo("name_" + i); + } + if (i % 5 == 0) { + assertThat(read.get(i).getField("value")).isNull(); + } else { + assertThat(read.get(i).getField("value")).isEqualTo(i * 2.0); + } + } + } + + @Test + void testColumnProjection() throws IOException { + LanceFormatModel model = createModel(); + File file = lanceFile("projection"); + + Schema fullSchema = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get()), + Types.NestedField.optional(3, "city", Types.StringType.get()), + Types.NestedField.optional(4, "age", Types.IntegerType.get()), + Types.NestedField.optional(5, "score", Types.DoubleType.get())); + + List written = Lists.newArrayList(); + for (int i = 0; i < 20; i++) { + GenericRecord record = GenericRecord.create(fullSchema); + record.setField("id", i); + record.setField("name", "user_" + i); + record.setField("city", "city_" + i); + record.setField("age", 25 + i); + record.setField("score", i * 3.14); + written.add(record); + } + + writeRecords(model, file, fullSchema, written); + + // Read back only id and name + Schema projectedSchema = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get())); + + List read = readRecords(model, file, projectedSchema); + + assertThat(read).hasSize(20); + for (int i = 0; i < 20; i++) { + assertThat(read.get(i).getField("id")).isEqualTo(i); + assertThat(read.get(i).getField("name")).isEqualTo("user_" + i); + } + } + + @Test + void testBatchSizeConfiguration() throws IOException { + LanceFormatModel model = createModel(); + File file = lanceFile("batch-size"); + + List written = Lists.newArrayList(); + for (int i = 0; i < 1000; i++) { + GenericRecord record = GenericRecord.create(SIMPLE_SCHEMA); + record.setField("id", i); + record.setField("name", "user_" + i); + record.setField("age", 20 + (i % 60)); + record.setField("score", i * 0.1); + written.add(record); + } + + writeRecords(model, file, SIMPLE_SCHEMA, written); + + // Read with small batch size + InputFile inputFile = Files.localInput(file); + ReadBuilder readBuilder = + model.readBuilder(inputFile).project(SIMPLE_SCHEMA).recordsPerBatch(100); + + List read = Lists.newArrayList(); + try (CloseableIterable iterable = readBuilder.build()) { + for (Record record : iterable) { + read.add(record); + } + } + + assertThat(read).hasSize(1000); + for (int i = 0; i < 1000; i++) { + assertThat(read.get(i).getField("id")).isEqualTo(written.get(i).getField("id")); + } + } + + @Test + void testSchemaPreservation() throws IOException { + LanceFormatModel model = createModel(); + File file = lanceFile("schema-preservation"); + + Schema schema = + new Schema( + Types.NestedField.required(10, "pk", Types.LongType.get()), + Types.NestedField.optional(20, "label", Types.StringType.get()), + Types.NestedField.optional(30, "active", Types.BooleanType.get()), + Types.NestedField.required(40, "ratio", Types.FloatType.get())); + + List written = Lists.newArrayList(); + for (int i = 0; i < 5; i++) { + GenericRecord record = GenericRecord.create(schema); + record.setField("pk", (long) i); + record.setField("label", "label_" + i); + record.setField("active", i % 2 == 0); + record.setField("ratio", (float) (i * 0.5)); + written.add(record); + } + + writeRecords(model, file, schema, written); + + // Read back and verify schema was preserved via metadata + List read = readRecords(model, file, schema); + assertThat(read).hasSize(5); + + // Verify field types round-tripped correctly + for (int i = 0; i < 5; i++) { + assertThat(read.get(i).getField("pk")).isInstanceOf(Long.class); + assertThat(read.get(i).getField("pk")).isEqualTo((long) i); + assertThat(read.get(i).getField("label")).isEqualTo("label_" + i); + assertThat(read.get(i).getField("active")).isEqualTo(i % 2 == 0); + assertThat(read.get(i).getField("ratio")).isInstanceOf(Float.class); + } + } + + @Test + void testFileLength() throws IOException { + LanceFormatModel model = createModel(); + File file = lanceFile("file-length"); + + EncryptedOutputFile encryptedOutput = + EncryptedFiles.plainAsEncryptedOutput(Files.localOutput(file)); + + ModelWriteBuilder writeBuilder = + model.writeBuilder(encryptedOutput).schema(SIMPLE_SCHEMA).content(FileContent.DATA); + + FileAppender appender = writeBuilder.build(); + try { + // Write enough records to trigger at least one batch flush (default batch size = 1024) + for (int i = 0; i < 2000; i++) { + GenericRecord record = GenericRecord.create(SIMPLE_SCHEMA); + record.setField("id", i); + record.setField("name", "user_" + i); + record.setField("age", 20 + i); + record.setField("score", i * 1.1); + appender.add(record); + } + + // After flushing at least one batch, the estimate should be > 0 + long midWriteLength = appender.length(); + assertThat(midWriteLength).isGreaterThan(0); + } finally { + appender.close(); + } + + // After close, length() returns exact file size from storage + long postCloseLength = appender.length(); + assertThat(postCloseLength).isGreaterThan(0); + assertThat(postCloseLength).isEqualTo(file.length()); + } +} diff --git a/lance/src/test/java/org/apache/iceberg/lance/TestLanceSchemaConversion.java b/lance/src/test/java/org/apache/iceberg/lance/TestLanceSchemaConversion.java new file mode 100644 index 000000000000..53e241c9b8f2 --- /dev/null +++ b/lance/src/test/java/org/apache/iceberg/lance/TestLanceSchemaConversion.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.lance; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +/** Tests for bidirectional Iceberg-Arrow schema conversion. */ +class TestLanceSchemaConversion { + + @Test + void testPrimitiveTypesRoundTrip() { + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get()), + Types.NestedField.required(3, "value", Types.DoubleType.get()), + Types.NestedField.optional(4, "flag", Types.BooleanType.get()), + Types.NestedField.optional(5, "count", Types.LongType.get()), + Types.NestedField.optional(6, "score", Types.FloatType.get()), + Types.NestedField.optional(7, "data", Types.BinaryType.get()), + Types.NestedField.optional(8, "event_date", Types.DateType.get()), + Types.NestedField.optional(9, "event_time", Types.TimeType.get())); + + Schema arrowSchema = LanceSchemaUtil.icebergToArrow(icebergSchema); + + assertThat(arrowSchema.getFields()).hasSize(9); + assertThat(arrowSchema.getFields().get(0).getName()).isEqualTo("id"); + assertThat(arrowSchema.getFields().get(0).isNullable()).isFalse(); + assertThat(arrowSchema.getFields().get(1).getName()).isEqualTo("name"); + assertThat(arrowSchema.getFields().get(1).isNullable()).isTrue(); + + org.apache.iceberg.Schema recovered = LanceSchemaUtil.arrowToIceberg(arrowSchema); + assertThat(recovered.columns()).hasSize(9); + assertThat(recovered.findField("id").type().typeId()) + .isEqualTo(Types.IntegerType.get().typeId()); + assertThat(recovered.findField("name").type().typeId()) + .isEqualTo(Types.StringType.get().typeId()); + assertThat(recovered.findField("value").type().typeId()) + .isEqualTo(Types.DoubleType.get().typeId()); + } + + @Test + void testDecimalType() { + org.apache.iceberg.Schema schema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "amount", Types.DecimalType.of(10, 2))); + + Schema arrowSchema = LanceSchemaUtil.icebergToArrow(schema); + org.apache.iceberg.Schema recovered = LanceSchemaUtil.arrowToIceberg(arrowSchema); + + Types.DecimalType decimal = (Types.DecimalType) recovered.findField("amount").type(); + assertThat(decimal.precision()).isEqualTo(10); + assertThat(decimal.scale()).isEqualTo(2); + } + + @Test + void testTimestampTypes() { + org.apache.iceberg.Schema schema = + new org.apache.iceberg.Schema( + Types.NestedField.optional(1, "ts_with_tz", Types.TimestampType.withZone()), + Types.NestedField.optional(2, "ts_no_tz", Types.TimestampType.withoutZone())); + + Schema arrowSchema = LanceSchemaUtil.icebergToArrow(schema); + assertThat(arrowSchema.getFields()).hasSize(2); + + org.apache.iceberg.Schema recovered = LanceSchemaUtil.arrowToIceberg(arrowSchema); + assertThat(recovered.findField("ts_with_tz").type()).isEqualTo(Types.TimestampType.withZone()); + assertThat(recovered.findField("ts_no_tz").type()).isEqualTo(Types.TimestampType.withoutZone()); + } + + @Test + void testFixedAndUuidTypes() { + org.apache.iceberg.Schema schema = + new org.apache.iceberg.Schema( + Types.NestedField.optional(1, "uuid_col", Types.UUIDType.get()), + Types.NestedField.optional(2, "fixed_col", Types.FixedType.ofLength(8))); + + Schema arrowSchema = LanceSchemaUtil.icebergToArrow(schema); + assertThat(arrowSchema.getFields()).hasSize(2); + } + + @Test + void testFieldIdPreservation() { + org.apache.iceberg.Schema schema = + new org.apache.iceberg.Schema( + Types.NestedField.required(42, "special_id", Types.IntegerType.get()), + Types.NestedField.optional(99, "name", Types.StringType.get())); + + Schema arrowSchema = LanceSchemaUtil.icebergToArrow(schema); + org.apache.iceberg.Schema recovered = LanceSchemaUtil.arrowToIceberg(arrowSchema); + + assertThat(recovered.findField("special_id").fieldId()).isEqualTo(42); + assertThat(recovered.findField("name").fieldId()).isEqualTo(99); + } + + @Test + void testColumnNameExtraction() { + org.apache.iceberg.Schema schema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "col_a", Types.IntegerType.get()), + Types.NestedField.optional(2, "col_b", Types.StringType.get()), + Types.NestedField.optional(3, "col_c", Types.DoubleType.get())); + + List names = LanceSchemaUtil.columnNames(schema); + assertThat(names).containsExactly("col_a", "col_b", "col_c"); + } + + @Test + void testStructType() { + org.apache.iceberg.Schema schema = + new org.apache.iceberg.Schema( + Types.NestedField.optional( + 1, + "address", + Types.StructType.of( + Types.NestedField.required(2, "street", Types.StringType.get()), + Types.NestedField.optional(3, "city", Types.StringType.get())))); + + Schema arrowSchema = LanceSchemaUtil.icebergToArrow(schema); + assertThat(arrowSchema.getFields()).hasSize(1); + assertThat(arrowSchema.getFields().get(0).getChildren()).hasSize(2); + + org.apache.iceberg.Schema recovered = LanceSchemaUtil.arrowToIceberg(arrowSchema); + Types.NestedField addressField = recovered.findField("address"); + assertThat(addressField.type().isStructType()).isTrue(); + Types.StructType structType = addressField.type().asStructType(); + assertThat(structType.fields()).hasSize(2); + assertThat(structType.field("street").type().typeId()) + .isEqualTo(Types.StringType.get().typeId()); + assertThat(structType.field("city").type().typeId()).isEqualTo(Types.StringType.get().typeId()); + } + + @Test + void testListType() { + org.apache.iceberg.Schema schema = + new org.apache.iceberg.Schema( + Types.NestedField.optional( + 1, "tags", Types.ListType.ofOptional(2, Types.StringType.get()))); + + Schema arrowSchema = LanceSchemaUtil.icebergToArrow(schema); + assertThat(arrowSchema.getFields()).hasSize(1); + assertThat(arrowSchema.getFields().get(0).getChildren()).hasSize(1); + + org.apache.iceberg.Schema recovered = LanceSchemaUtil.arrowToIceberg(arrowSchema); + Types.NestedField tagsField = recovered.findField("tags"); + assertThat(tagsField.type().isListType()).isTrue(); + Types.ListType listType = tagsField.type().asListType(); + assertThat(listType.elementType().typeId()).isEqualTo(Types.StringType.get().typeId()); + } +} diff --git a/settings.gradle b/settings.gradle index 70f9343a252b..69ee0a12b857 100644 --- a/settings.gradle +++ b/settings.gradle @@ -31,6 +31,7 @@ include 'azure-bundle' include 'orc' include 'arrow' include 'parquet' +include 'lance' include 'bundled-guava' include 'spark' include 'hive-metastore' @@ -57,6 +58,7 @@ project(':azure-bundle').name = 'iceberg-azure-bundle' project(':orc').name = 'iceberg-orc' project(':arrow').name = 'iceberg-arrow' project(':parquet').name = 'iceberg-parquet' +project(':lance').name = 'iceberg-lance' project(':bundled-guava').name = 'iceberg-bundled-guava' project(':spark').name = 'iceberg-spark' project(':hive-metastore').name = 'iceberg-hive-metastore' diff --git a/spark/v4.1/build.gradle b/spark/v4.1/build.gradle index a6b03f7e37a3..bcdf96b89032 100644 --- a/spark/v4.1/build.gradle +++ b/spark/v4.1/build.gradle @@ -326,6 +326,14 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio relocate 'org.roaringbitmap', 'org.apache.iceberg.shaded.org.roaringbitmap' relocate 'org.apache.datasketches', 'org.apache.iceberg.shaded.org.apache.datasketches' + // Exclude Lance module and its dependencies from the shadow jar. + // Lance SDK uses JNI with hardcoded Arrow class names — relocating Arrow breaks it. + // Users must add iceberg-lance.jar and lance-core.jar separately to the classpath. + exclude 'org/apache/iceberg/lance/**' + exclude 'org/lance/**' + exclude 'nativelib/**' + exclude 'io/questdb/**' + archiveClassifier.set(null) }