Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/src/main/java/org/apache/iceberg/FileFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public enum FileFormat {
ORC("orc", true),
PARQUET("parquet", true),
AVRO("avro", true),
LANCE("lance", false),
METADATA("metadata.json", false);

private final String ext;
Expand Down
40 changes: 40 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,46 @@ project(':iceberg-arrow') {
}
}

project(':iceberg-lance') {
test {
useJUnitPlatform()
jvmArgs '--add-opens=java.base/java.nio=ALL-UNNAMED'
}
dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
implementation project(':iceberg-core')
implementation project(':iceberg-arrow')

implementation(libs.arrow.vector) {
exclude group: 'io.netty', module: 'netty-buffer'
exclude group: 'io.netty', module: 'netty-common'
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
implementation(libs.arrow.memory.netty) {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
exclude group: 'io.netty', module: 'netty-common'
exclude group: 'io.netty', module: 'netty-buffer'
}

// Lance Java SDK (external dependency)
implementation 'org.lance:lance-core:4.1.0-beta.0'

// Spark (compileOnly — provided at runtime by Spark environment)
compileOnly("org.apache.spark:spark-hive_2.13:${libs.versions.spark41.get()}") {
exclude group: 'org.apache.arrow'
exclude group: 'org.apache.parquet'
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'io.netty', module: 'netty-buffer'
exclude group: 'io.netty', module: 'netty-common'
exclude group: 'org.roaringbitmap'
}

testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
}
}

project(':iceberg-nessie') {
test {
useJUnitPlatform()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ private FormatModelRegistry() {}
"org.apache.iceberg.data.GenericFormatModels",
"org.apache.iceberg.arrow.vectorized.ArrowFormatModels",
"org.apache.iceberg.flink.data.FlinkFormatModels",
"org.apache.iceberg.spark.source.SparkFormatModels");
"org.apache.iceberg.spark.source.SparkFormatModels",
"org.apache.iceberg.lance.LanceFormatModels");

// Format models indexed by file format and object model class
private static final Map<Pair<FileFormat, Class<?>>, FormatModel<?, ?>> MODELS =
Expand Down
150 changes: 150 additions & 0 deletions lance/LANCE_SDK_GAPS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# Lance Java SDK Gaps

Gaps in the Lance Java SDK that affect the iceberg-lance integration.
These should be addressed in the Lance Java SDK, not worked around in iceberg-lance.

## 1. File Length (getBytesWritten)

**Current workaround:** After closing the writer, we use `OutputFile.toInputFile().getLength()`
to get the file size (same pattern as ORC's FileAppender).

**What Lance already has:** The Rust `FileWriter::finish()` returns `Result<u64>` — the total
bytes written. But the JNI layer (`file_writer.rs`) discards this value in `closeNative`:

Ok(_) => {} // discards the u64

And the Java `close()` returns void.

**Proposed fix in Lance Java SDK:**
- JNI: Change `closeNative` to return `jlong` with the bytes written
- Java: Store the value in a field, add `public long getBytesWritten()` getter
- Non-breaking: `close()` stays void, `AutoCloseable` contract preserved

## 2. Column Statistics (min/max/null_count per column)

**Current state:** `LanceFileAppender.metrics()` only returns `recordCount`. All other
fields (columnSizes, valueCounts, nullValueCounts, lowerBounds, upperBounds) are null.
This means Iceberg cannot do file-level pruning on Lance data files.

**What Lance is building:** PR #5639 (Column Statistics MVP) adds per-fragment min/max,
null count, and NaN count to the Rust core. Stored as Arrow IPC in the file's global
buffer. Design discussion: https://github.com/lancedb/lance/discussions/4540

**What's missing:** PR #5639 does not touch the Java SDK. Once merged, the JNI layer
and Java SDK need new APIs to read column statistics back — something like
`LanceFileReader.getColumnStatistics()`.

**Proposed fix in Lance Java SDK (after Rust PR #5639 merges):**
- JNI: Expose column statistics from the global buffer via a new native method
- Java: Add `getColumnStatistics()` to LanceFileReader returning per-column min/max/null_count
- iceberg-lance: Map those stats to Iceberg's Metrics object using Conversions.toByteBuffer()

## 3. Split Planning (Parallel Reads)

**Current state:** `FileFormat.LANCE("lance", false)` — splittable is set to false.
`LanceFileAppender.splitOffsets()` returns null. This means each Lance file is read by
a single task. No within-file parallelism.

**Why this matters:** For MPP engines like Spark, parallelism comes from task count.
Without splits, one 512MB Lance file = one task = one core. With splits, that same file
could be 10 tasks across 10 cores.

**Approach (depends on Gap #1 — getBytesWritten):**

During writing, after each batch flush in LanceFileAppender, record the cumulative bytes
written so far (from `getBytesWritten()`) and the corresponding row number. Store this
mapping in the Lance file's schema metadata via `addSchemaMetadata()`:

"lance:split:0" → "0" // byte offset 0 → row 0
"lance:split:34500000" → "50000" // byte offset 34.5MB → row 50000
"lance:split:69200000" → "100000" // byte offset 69.2MB → row 100000

Write side:
- `splitOffsets()` returns the byte offsets: [0, 34500000, 69200000]
- Iceberg stores these in the DataFile manifest entry
- All Iceberg math (estimatedRowsCount, split validation) works correctly
because these are real byte positions

Read side:
- Iceberg calls `ReadBuilder.split(start=34500000, length=34700000)`
- Our reader opens the Lance file, reads the metadata map
- Looks up byte offset 34500000 → row 50000
- Looks up next split offset 69200000 → row 100000
- Calls `readAll(projectedNames, List.of(new Range(50000, 100000)), batchSize)`
- Lance natively supports row-range reads via its page table

Changes needed:
- Lance Java SDK: expose `getBytesWritten()` (same as Gap #1)
- iceberg-lance: Change `FileFormat.LANCE("lance", true)` — set splittable to true
- iceberg-lance: LanceFileAppender — record byte-offset-to-row mapping after each flush
- iceberg-lance: LanceFileAppender.splitOffsets() — return byte offset list
- iceberg-lance: ReadBuilderWrapper.split() — store start/length, translate to row range on build()

**Why row numbers as offsets won't work:**
Iceberg's `estimatedRowsCount()` in BaseContentScanTask computes:
length / (fileSizeInBytes - firstOffset) * recordCount
This assumes offsets and length are byte positions. Using row numbers (e.g., 50000)
with fileSizeInBytes (e.g., 50000000) gives a fraction of 0.001 instead of ~0.33,
producing wildly wrong estimates that break Spark's task scheduling.

**Why proportional byte-to-row translation won't work:**
Lance is columnar — bytes are organized by columns, not rows. The first 30% of a file's
bytes does not correspond to the first 30% of rows. Proportional math
(offset / fileSizeInBytes * totalRows) is fundamentally inaccurate for columnar formats.

## 4. Predicate Pushdown (filter)

**Current state:** `ReadBuilderWrapper.filter()` is a no-op. The filter expression is
accepted but never applied. All rows are read and filtering happens in Iceberg's residual
evaluator after the fact.

**Why this matters:** Without pushdown, Lance reads all rows from disk even when a query
has a WHERE clause that could eliminate most of them. For a query like
`SELECT * FROM t WHERE id = 42`, Lance would read all rows instead of using its scalar
index or page-level filtering.

**What Lance supports:** Lance has native predicate pushdown through its scan API:
- Scalar indexes (BTREE, BITMAP) for point lookups and range queries
- Zone map filtering at the page level
- Filter expressions passed to the reader

**What's needed in iceberg-lance:**
- Translate Iceberg's `Expression` (from `ReadBuilder.filter()`) to Lance's filter format
- Pass the translated filter to `readAll()` or use Lance's scan API with filter support
- This is purely iceberg-lance work — no Lance SDK changes needed

**Priority:** Medium. Not blocking for initial contribution since Iceberg applies residual
filters anyway (correctness is preserved). But important for production performance.

## 5. Name Mapping (Schema Evolution)

**Current state:** `ReadBuilderWrapper.withNameMapping()` is a no-op. The NameMapping
is accepted but never used.

**Why this matters:** Iceberg supports schema evolution — columns can be renamed, added,
or reordered. When reading files written with an older schema, Iceberg uses NameMapping
to resolve column identity by field ID rather than column name. Without this, reading
files written before a column rename would fail or return wrong data.

**What's needed in iceberg-lance:**
- When a NameMapping is provided, use it to resolve column names during schema conversion
in LanceSchemaUtil
- Map Iceberg field IDs to the column names used in the Lance file, regardless of what
the current schema calls them
- This relies on the `PARQUET:field_id` metadata that LanceSchemaUtil already writes
into Arrow schema fields

**Priority:** Low for initial contribution. Becomes important when users start evolving
schemas on existing Lance-backed Iceberg tables.

## References

- Lance PR #5639: Column Statistics MVP — https://github.com/lancedb/lance/pull/5639
- Lance Discussion #4540: Column Statistics Design — https://github.com/lancedb/lance/discussions/4540
- Lance Issue #5857: Post-MVP Improvements — https://github.com/lancedb/lance/issues/5857
- Lance JNI file_writer.rs: closeNative discards finish() return value
- Iceberg BaseContentScanTask.java — split planning logic, estimatedRowsCount()
- Iceberg OffsetsAwareSplitScanTaskIterator.java — splits tasks by offset boundaries
- Iceberg BaseFile.java lines 551-553 — split offset validation
- Iceberg GenericReader.java — calls ReadBuilder.split(task.start(), task.length())
- Lance LanceFileReader.readAll() — supports row-range reads via Range parameter
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.lance;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

/**
* Reader function for the generic Iceberg {@link Record} data model.
*
* <p>Converts Arrow VectorSchemaRoot batches from Lance files into Iceberg Records.
*/
public class GenericLanceReader {

private GenericLanceReader() {}

/**
* Creates a function that converts Arrow batches into CloseableIterables of Iceberg Records.
*
* @param icebergSchema the Iceberg schema to construct Records with
* @param arrowSchema the Arrow schema of the Lance file (unused, schema comes from batch)
* @param idToConstant constant values to inject (e.g., partition values)
* @return a function that takes a (batch, idToConstant) Entry and produces Records
*/
public static Function<Map.Entry<VectorSchemaRoot, Map<Integer, ?>>, CloseableIterable<Record>>
buildReader(
Schema icebergSchema,
org.apache.arrow.vector.types.pojo.Schema arrowSchema,
Map<Integer, ?> idToConstant) {
return entry -> {
VectorSchemaRoot batch = entry.getKey();
Map<Integer, ?> constants = entry.getValue();

List<Record> records = Lists.newArrayListWithCapacity(batch.getRowCount());
for (int i = 0; i < batch.getRowCount(); i++) {
records.add(LanceArrowConverter.readRow(batch, i, icebergSchema, constants));
}

return new CloseableIterable<Record>() {
@Override
public CloseableIterator<Record> iterator() {
return CloseableIterator.withClose(records.iterator());
}

@Override
public void close() throws IOException {
// batch lifecycle managed by ArrowReader
}
};
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.lance;

import java.util.Map;
import java.util.function.Function;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;

/**
* Writer function for the generic Iceberg {@link Record} data model.
*
* <p>Converts Iceberg Records to Maps of column name to value, suitable for the Lance Arrow
* conversion pipeline.
*/
public class GenericLanceWriter {

private GenericLanceWriter() {}

/**
* Creates a function that converts Iceberg Records to a Map representation.
*
* @param icebergSchema the Iceberg schema defining the columns
* @param arrowSchema the corresponding Arrow schema (unused for generic records)
* @return a function mapping Records to column name/value Maps
*/
public static Function<Record, Map<String, Object>> create(
Schema icebergSchema, org.apache.arrow.vector.types.pojo.Schema arrowSchema) {
return record -> LanceArrowConverter.recordToMap(record, icebergSchema);
}
}
Loading
Loading