From 7a47d3e281998b124cd1d0a4ad0e3ad3ccb80051 Mon Sep 17 00:00:00 2001 From: hemanthsavasere Date: Sun, 15 Mar 2026 18:27:30 +0000 Subject: [PATCH 1/3] feat: add end-to-end ROW (nested struct) column serialization support - Add `Datum::Row(Box)` variant with `as_row()` accessor - Add `get_row()` to `InternalRow` trait with default error impl - Implement `GenericRow::get_row()` and `CompactedRow::get_row()` delegation - Implement `ColumnarRow::get_row()` with Arrow StructArray extraction + OnceLock caching - Add `InnerValueWriter::Row(RowType)` and write path via nested CompactedRowWriter - Add `DataType::Row` arm in `CompactedRowDeserializer` for eager nested decode - Add `InnerFieldGetter::Row` and hook up FieldGetter/ValueWriter pipeline - Handle `Datum::Row` in `resolve_row_types` (C++ bindings) - Add round-trip tests: simple nesting, deep nesting, nullable fields, ROW as primary key Wire format matches Java: varint-length-prefixed blob of a complete CompactedRow. --- bindings/cpp/src/types.rs | 1 + crates/fluss/src/row/binary/binary_writer.rs | 25 +- crates/fluss/src/row/column.rs | 300 +++++++++- .../fluss/src/row/compacted/compacted_row.rs | 4 + .../src/row/compacted/compacted_row_reader.rs | 151 +++++ crates/fluss/src/row/datum.rs | 15 + .../src/row/encode/compacted_key_encoder.rs | 48 +- crates/fluss/src/row/field_getter.rs | 10 +- crates/fluss/src/row/mod.rs | 19 +- .../2026-03-15-row-nested-struct-support.md | 564 ++++++++++++++++++ thoughts/shared/pr_description.md | 31 + .../2026-03-15-row-nested-struct-support.md | 337 +++++++++++ 12 files changed, 1494 insertions(+), 11 deletions(-) create mode 100644 thoughts/shared/plans/2026-03-15-row-nested-struct-support.md create mode 100644 thoughts/shared/pr_description.md create mode 100644 thoughts/shared/research/2026-03-15-row-nested-struct-support.md diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs index f8efe677..87c4de30 100644 --- a/bindings/cpp/src/types.rs +++ b/bindings/cpp/src/types.rs @@ -351,6 +351,7 @@ pub fn resolve_row_types( Datum::Time(t) => Datum::Time(*t), Datum::TimestampNtz(ts) => Datum::TimestampNtz(*ts), Datum::TimestampLtz(ts) => Datum::TimestampLtz(*ts), + Datum::Row(r) => Datum::Row(Box::new(resolve_row_types(r, None)?)), }; out.set_field(idx, resolved); } diff --git a/crates/fluss/src/row/binary/binary_writer.rs b/crates/fluss/src/row/binary/binary_writer.rs index af2765c4..2ab3ce8c 100644 --- a/crates/fluss/src/row/binary/binary_writer.rs +++ b/crates/fluss/src/row/binary/binary_writer.rs @@ -17,7 +17,7 @@ use crate::error::Error::IllegalArgument; use crate::error::Result; -use crate::metadata::DataType; +use crate::metadata::{DataType, RowType}; use crate::row::Datum; use crate::row::binary::BinaryRowFormat; @@ -136,7 +136,7 @@ pub enum InnerValueWriter { Time(u32), // precision (not used in wire format, but kept for consistency) TimestampNtz(u32), // precision TimestampLtz(u32), // precision - // TODO Array, Row + Row(RowType), } /// Accessor for writing the fields/elements of a binary writer during runtime, the @@ -175,6 +175,7 @@ impl InnerValueWriter { // Validation is done at TimestampLTzType construction time Ok(InnerValueWriter::TimestampLtz(t.precision())) } + DataType::Row(row_type) => Ok(InnerValueWriter::Row(row_type.clone())), _ => unimplemented!( "ValueWriter for DataType {:?} is currently not implemented", data_type @@ -237,6 +238,26 @@ impl InnerValueWriter { (InnerValueWriter::TimestampLtz(p), Datum::TimestampLtz(ts)) => { writer.write_timestamp_ltz(ts, *p); } + (InnerValueWriter::Row(row_type), Datum::Row(inner_row)) => { + use crate::row::compacted::CompactedRowWriter; + let field_count = row_type.fields().len(); + let mut nested = CompactedRowWriter::new(field_count); + for (i, field) in row_type.fields().iter().enumerate() { + let datum = &inner_row.values[i]; + if datum.is_null() { + if field.data_type.is_nullable() { + nested.set_null_at(i); + } + } else { + let vw = + InnerValueWriter::create_inner_value_writer(&field.data_type, None) + .expect("create_inner_value_writer failed for nested row field"); + vw.write_value(&mut nested, i, datum) + .expect("write_value failed for nested row field"); + } + } + writer.write_bytes(nested.buffer()); + } _ => { return Err(IllegalArgument { message: format!("{self:?} used to write value {value:?}"), diff --git a/crates/fluss/src/row/column.rs b/crates/fluss/src/row/column.rs index c07fe97c..df0b9e6c 100644 --- a/crates/fluss/src/row/column.rs +++ b/crates/fluss/src/row/column.rs @@ -17,8 +17,8 @@ use crate::error::Error::IllegalArgument; use crate::error::Result; -use crate::row::InternalRow; -use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz}; +use crate::row::{GenericRow, InternalRow}; +use crate::row::datum::{Date, Datum, Time, TimestampLtz, TimestampNtz}; use arrow::array::{Array, AsArray, BinaryArray, RecordBatch, StringArray}; use arrow::datatypes::{ DataType as ArrowDataType, Date32Type, Decimal128Type, Float32Type, Float64Type, Int8Type, @@ -32,25 +32,33 @@ use std::sync::Arc; pub struct ColumnarRow { record_batch: Arc, row_id: usize, + nested_rows: Vec>>, } impl ColumnarRow { pub fn new(batch: Arc) -> Self { + let num_cols = batch.num_columns(); ColumnarRow { record_batch: batch, row_id: 0, + nested_rows: (0..num_cols).map(|_| std::sync::OnceLock::new()).collect(), } } pub fn new_with_row_id(bach: Arc, row_id: usize) -> Self { + let num_cols = bach.num_columns(); ColumnarRow { record_batch: bach, row_id, + nested_rows: (0..num_cols).map(|_| std::sync::OnceLock::new()).collect(), } } pub fn set_row_id(&mut self, row_id: usize) { - self.row_id = row_id + self.row_id = row_id; + for lock in &mut self.nested_rows { + *lock = std::sync::OnceLock::new(); + } } pub fn get_row_id(&self) -> usize { @@ -209,6 +217,168 @@ impl ColumnarRow { }), } } + + /// Extract a `GenericRow<'static>` from a column in the RecordBatch at the given row_id. + fn extract_struct_at( + batch: &RecordBatch, + pos: usize, + row_id: usize, + ) -> Result> { + let col = batch.column(pos); + Self::extract_struct_from_array(col.as_ref(), row_id) + } + + /// Recursively extract a `GenericRow<'static>` from a `StructArray` at row_id. + fn extract_struct_from_array(array: &dyn Array, row_id: usize) -> Result> { + use arrow::array::StructArray; + let sa = array + .as_any() + .downcast_ref::() + .ok_or_else(|| IllegalArgument { + message: format!("expected StructArray, got {:?}", array.data_type()), + })?; + let mut values = Vec::with_capacity(sa.num_columns()); + for i in 0..sa.num_columns() { + let child = sa.column(i); + values.push(Self::arrow_value_to_datum(child.as_ref(), row_id)?); + } + Ok(GenericRow { values }) + } + + /// Convert a single element at `row_id` in an Arrow array to a `Datum<'static>`. + fn arrow_value_to_datum(array: &dyn Array, row_id: usize) -> Result> { + use arrow::array::{ + BooleanArray, Decimal128Array, Float32Array, Float64Array, Int8Array, Int16Array, + Int32Array, Int64Array, Time32MillisecondArray, Time32SecondArray, + Time64MicrosecondArray, Time64NanosecondArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, + }; + use crate::row::Decimal; + + if array.is_null(row_id) { + return Ok(Datum::Null); + } + + match array.data_type() { + ArrowDataType::Boolean => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Bool(a.value(row_id))) + } + ArrowDataType::Int8 => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Int8(a.value(row_id))) + } + ArrowDataType::Int16 => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Int16(a.value(row_id))) + } + ArrowDataType::Int32 => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Int32(a.value(row_id))) + } + ArrowDataType::Int64 => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Int64(a.value(row_id))) + } + ArrowDataType::Float32 => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Float32(a.value(row_id).into())) + } + ArrowDataType::Float64 => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Float64(a.value(row_id).into())) + } + ArrowDataType::Utf8 => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::String(std::borrow::Cow::Owned(a.value(row_id).to_owned()))) + } + ArrowDataType::Binary => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Blob(std::borrow::Cow::Owned(a.value(row_id).to_vec()))) + } + ArrowDataType::Decimal128(p, s) => { + let (p, s) = (*p, *s); + let a = array.as_any().downcast_ref::().unwrap(); + let i128_val = a.value(row_id); + Ok(Datum::Decimal(Decimal::from_arrow_decimal128( + i128_val, + s as i64, + p as u32, + s as u32, + )?)) + } + ArrowDataType::Date32 => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Date(Date::new(a.value(row_id)))) + } + ArrowDataType::Time32(TimeUnit::Second) => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Time(Time::new(a.value(row_id) * 1000))) + } + ArrowDataType::Time32(TimeUnit::Millisecond) => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Time(Time::new(a.value(row_id)))) + } + ArrowDataType::Time64(TimeUnit::Microsecond) => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Time(Time::new((a.value(row_id) / 1000) as i32))) + } + ArrowDataType::Time64(TimeUnit::Nanosecond) => { + let a = array.as_any().downcast_ref::().unwrap(); + Ok(Datum::Time(Time::new((a.value(row_id) / 1_000_000) as i32))) + } + ArrowDataType::Timestamp(time_unit, tz) => { + let value: i64 = match time_unit { + TimeUnit::Second => { + array.as_any().downcast_ref::().unwrap().value(row_id) + } + TimeUnit::Millisecond => { + array.as_any().downcast_ref::().unwrap().value(row_id) + } + TimeUnit::Microsecond => { + array.as_any().downcast_ref::().unwrap().value(row_id) + } + TimeUnit::Nanosecond => { + array.as_any().downcast_ref::().unwrap().value(row_id) + } + }; + let (millis, nanos) = match time_unit { + TimeUnit::Second => (value * 1000, 0i32), + TimeUnit::Millisecond => (value, 0i32), + TimeUnit::Microsecond => { + let millis = value.div_euclid(1000); + let nanos = (value.rem_euclid(1000) * 1000) as i32; + (millis, nanos) + } + TimeUnit::Nanosecond => { + let millis = value.div_euclid(1_000_000); + let nanos = value.rem_euclid(1_000_000) as i32; + (millis, nanos) + } + }; + if tz.is_some() { + if nanos == 0 { + Ok(Datum::TimestampLtz(TimestampLtz::new(millis))) + } else { + Ok(Datum::TimestampLtz(TimestampLtz::from_millis_nanos(millis, nanos)?)) + } + } else if nanos == 0 { + Ok(Datum::TimestampNtz(TimestampNtz::new(millis))) + } else { + Ok(Datum::TimestampNtz(TimestampNtz::from_millis_nanos(millis, nanos)?)) + } + } + ArrowDataType::Struct(_) => { + let nested = Self::extract_struct_from_array(array, row_id)?; + Ok(Datum::Row(Box::new(nested))) + } + other => Err(IllegalArgument { + message: format!( + "unsupported Arrow data type for nested row extraction: {other:?}" + ), + }), + } + } } impl InternalRow for ColumnarRow { @@ -407,6 +577,18 @@ impl InternalRow for ColumnarRow { })? .value(self.row_id)) } + + fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { + let lock = self.nested_rows.get(pos).ok_or_else(|| IllegalArgument { + message: format!("column index {pos} out of bounds for get_row"), + })?; + let batch = Arc::clone(&self.record_batch); + let row_id = self.row_id; + Ok(lock.get_or_init(|| { + Self::extract_struct_at(&batch, pos, row_id) + .expect("failed to extract nested row from StructArray") + })) + } } #[cfg(test)] @@ -414,9 +596,9 @@ mod tests { use super::*; use arrow::array::{ BinaryArray, BooleanArray, Decimal128Array, Float32Array, Float64Array, Int8Array, - Int16Array, Int32Array, Int64Array, StringArray, + Int16Array, Int32Array, Int64Array, StringArray, StructArray, }; - use arrow::datatypes::{DataType, Field, Schema}; + use arrow::datatypes::{DataType, Field, Fields, Schema}; #[test] fn columnar_row_reads_values() { @@ -533,4 +715,112 @@ mod tests { .unwrap() ); } + + fn make_struct_batch( + field_name: &str, + child_fields: Fields, + child_arrays: Vec>, + _num_rows: usize, + ) -> Arc { + let struct_array = StructArray::new(child_fields.clone(), child_arrays, None); + let schema = Arc::new(Schema::new(vec![Field::new( + field_name, + DataType::Struct(child_fields), + false, + )])); + Arc::new( + RecordBatch::try_new(schema, vec![Arc::new(struct_array)]) + .expect("record batch"), + ) + } + + #[test] + fn columnar_row_reads_nested_row() { + // Build a RecordBatch with a Struct column: {i32, string} + let child_fields = Fields::from(vec![ + Field::new("x", DataType::Int32, false), + Field::new("s", DataType::Utf8, false), + ]); + let child_arrays: Vec> = vec![ + Arc::new(Int32Array::from(vec![42, 99])), + Arc::new(StringArray::from(vec!["hello", "world"])), + ]; + let batch = make_struct_batch("nested", child_fields, child_arrays, 2); + + let mut row = ColumnarRow::new(batch); + + // row_id = 0 + let nested = row.get_row(0).unwrap(); + assert_eq!(nested.get_field_count(), 2); + assert_eq!(nested.get_int(0).unwrap(), 42); + assert_eq!(nested.get_string(1).unwrap(), "hello"); + + // row_id = 1 + row.set_row_id(1); + let nested = row.get_row(0).unwrap(); + assert_eq!(nested.get_int(0).unwrap(), 99); + assert_eq!(nested.get_string(1).unwrap(), "world"); + } + + #[test] + fn columnar_row_reads_deeply_nested_row() { + // Build: outer struct { i32, inner struct { string } } + let inner_fields = Fields::from(vec![Field::new("s", DataType::Utf8, false)]); + let inner_array = Arc::new(StructArray::new( + inner_fields.clone(), + vec![Arc::new(StringArray::from(vec!["deep", "deeper"])) as Arc], + None, + )); + + let outer_fields = Fields::from(vec![ + Field::new("n", DataType::Int32, false), + Field::new("inner", DataType::Struct(inner_fields), false), + ]); + let outer_array = Arc::new(StructArray::new( + outer_fields.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2])) as Arc, + inner_array as Arc, + ], + None, + )); + + let schema = Arc::new(Schema::new(vec![Field::new( + "outer", + DataType::Struct(outer_fields), + false, + )])); + let batch = Arc::new( + RecordBatch::try_new(schema, vec![outer_array]).expect("record batch"), + ); + + let row = ColumnarRow::new(batch); + + // Access outer struct at column 0, row 0 + let outer = row.get_row(0).unwrap(); + assert_eq!(outer.get_int(0).unwrap(), 1); + + // Access inner struct (column 1 of outer) + let inner = outer.get_row(1).unwrap(); + assert_eq!(inner.get_string(0).unwrap(), "deep"); + } + + #[test] + fn columnar_row_get_row_cache_invalidated_on_set_row_id() { + let child_fields = Fields::from(vec![Field::new("x", DataType::Int32, false)]); + let child_arrays: Vec> = + vec![Arc::new(Int32Array::from(vec![10, 20]))]; + let batch = make_struct_batch("s", child_fields, child_arrays, 2); + + let mut row = ColumnarRow::new(batch); + + // row_id = 0: nested x = 10 + let nested_0 = row.get_row(0).unwrap(); + assert_eq!(nested_0.get_int(0).unwrap(), 10); + + // After set_row_id(1), cache is cleared → nested x = 20 + row.set_row_id(1); + let nested_1 = row.get_row(0).unwrap(); + assert_eq!(nested_1.get_int(0).unwrap(), 20); + } } diff --git a/crates/fluss/src/row/compacted/compacted_row.rs b/crates/fluss/src/row/compacted/compacted_row.rs index 918ebdfd..5481ed4e 100644 --- a/crates/fluss/src/row/compacted/compacted_row.rs +++ b/crates/fluss/src/row/compacted/compacted_row.rs @@ -160,6 +160,10 @@ impl<'a> InternalRow for CompactedRow<'a> { self.decoded_row().get_bytes(pos) } + fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { + self.decoded_row().get_row(pos) + } + fn as_encoded_bytes(&self, write_format: WriteFormat) -> Option<&[u8]> { match write_format { WriteFormat::CompactedKv => Some(self.as_bytes()), diff --git a/crates/fluss/src/row/compacted/compacted_row_reader.rs b/crates/fluss/src/row/compacted/compacted_row_reader.rs index 00e53aa1..a8b1638d 100644 --- a/crates/fluss/src/row/compacted/compacted_row_reader.rs +++ b/crates/fluss/src/row/compacted/compacted_row_reader.rs @@ -160,6 +160,18 @@ impl<'a> CompactedRowDeserializer<'a> { (Datum::TimestampLtz(timestamp_ltz), next) } } + DataType::Row(row_type) => { + let (nested_bytes, next) = reader.read_bytes(cursor); + let nested_reader = CompactedRowReader::new( + row_type.fields().len(), + nested_bytes, + 0, + nested_bytes.len(), + ); + let nested_deser = CompactedRowDeserializer::new_from_owned(row_type.clone()); + let nested_row = nested_deser.deserialize(&nested_reader); + (Datum::Row(Box::new(nested_row)), next) + } _ => { panic!("Unsupported DataType in CompactedRowDeserializer: {dtype:?}"); } @@ -286,3 +298,142 @@ impl<'a> CompactedRowReader<'a> { (s, next_pos) } } + +#[cfg(test)] +mod row_type_tests { + use crate::metadata::{DataType, DataTypes, RowType}; + use crate::row::compacted::compacted_row_reader::{CompactedRowDeserializer, CompactedRowReader}; + use crate::row::compacted::compacted_row_writer::CompactedRowWriter; + use crate::row::binary::ValueWriter; + use crate::row::field_getter::FieldGetter; + use crate::row::{Datum, GenericRow, InternalRow}; + + fn round_trip(outer_row_type: &RowType, outer_row: &GenericRow, verify: F) + where + F: FnOnce(&GenericRow), + { + // Write + let field_getters = FieldGetter::create_field_getters(outer_row_type); + let value_writers: Vec = outer_row_type + .fields() + .iter() + .map(|f| ValueWriter::create_value_writer(f.data_type(), None).unwrap()) + .collect(); + let mut writer = CompactedRowWriter::new(outer_row_type.fields().len()); + for (i, (getter, vw)) in field_getters.iter().zip(value_writers.iter()).enumerate() { + let datum = getter.get_field(outer_row as &dyn InternalRow).unwrap(); + vw.write_value(&mut writer, i, &datum).unwrap(); + } + let bytes = writer.to_bytes(); + + // Read + let deser = CompactedRowDeserializer::new(outer_row_type); + let reader = CompactedRowReader::new( + outer_row_type.fields().len(), + bytes.as_ref(), + 0, + bytes.len(), + ); + let result = deser.deserialize(&reader); + verify(&result); + } + + #[test] + fn test_row_simple_nesting() { + // ROW nested inside an outer row + let inner_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataTypes::string()], + vec!["x", "label"], + ); + let outer_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataType::Row(inner_row_type.clone())], + vec!["id", "nested"], + ); + + let mut inner = GenericRow::new(2); + inner.set_field(0, 42_i32); + inner.set_field(1, "hello"); + + let mut outer = GenericRow::new(2); + outer.set_field(0, 1_i32); + outer.set_field(1, Datum::Row(Box::new(inner))); + + round_trip(&outer_row_type, &outer, |result| { + assert_eq!(result.get_int(0).unwrap(), 1); + let nested = result.get_row(1).unwrap(); + assert_eq!(nested.get_int(0).unwrap(), 42); + assert_eq!(nested.get_string(1).unwrap(), "hello"); + }); + } + + #[test] + fn test_row_deep_nesting() { + // ROW> — two levels of nesting + let inner_inner_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int()], + vec!["n"], + ); + let inner_row_type = RowType::with_data_types_and_field_names( + vec![DataType::Row(inner_inner_row_type.clone())], + vec!["inner"], + ); + let outer_row_type = RowType::with_data_types_and_field_names( + vec![DataType::Row(inner_row_type.clone())], + vec!["outer"], + ); + + let mut innermost = GenericRow::new(1); + innermost.set_field(0, 99_i32); + + let mut middle = GenericRow::new(1); + middle.set_field(0, Datum::Row(Box::new(innermost))); + + let mut outer = GenericRow::new(1); + outer.set_field(0, Datum::Row(Box::new(middle))); + + round_trip(&outer_row_type, &outer, |result| { + let mid = result.get_row(0).unwrap(); + let inner = mid.get_row(0).unwrap(); + assert_eq!(inner.get_int(0).unwrap(), 99); + }); + } + + #[test] + fn test_row_with_nullable_fields() { + // Outer nullable ROW column; nested row with a nullable STRING field set to null + let inner_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataTypes::string()], + vec!["id", "optional_name"], + ); + let outer_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataType::Row(inner_row_type.clone())], + vec!["k", "nested"], + ); + + // Case 1: non-null nested row with a null field inside + let mut inner = GenericRow::new(2); + inner.set_field(0, 7_i32); + inner.set_field(1, Datum::Null); + + let mut outer = GenericRow::new(2); + outer.set_field(0, 10_i32); + outer.set_field(1, Datum::Row(Box::new(inner))); + + round_trip(&outer_row_type, &outer, |result| { + assert_eq!(result.get_int(0).unwrap(), 10); + let nested = result.get_row(1).unwrap(); + assert_eq!(nested.get_int(0).unwrap(), 7); + assert!(nested.is_null_at(1).unwrap()); + }); + + // Case 2: outer ROW column is null + let mut outer_null = GenericRow::new(2); + outer_null.set_field(0, 20_i32); + outer_null.set_field(1, Datum::Null); + + round_trip(&outer_row_type, &outer_null, |result2| { + assert_eq!(result2.get_int(0).unwrap(), 20); + assert!(result2.is_null_at(1).unwrap()); + }); + } +} diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs index 9b2e80a6..7fe3d110 100644 --- a/crates/fluss/src/row/datum.rs +++ b/crates/fluss/src/row/datum.rs @@ -18,6 +18,7 @@ use crate::error::Error::RowConvertError; use crate::error::Result; use crate::row::Decimal; +use crate::row::GenericRow; use arrow::array::{ ArrayBuilder, BinaryBuilder, BooleanBuilder, Date32Builder, Decimal128Builder, FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, Int16Builder, @@ -68,6 +69,8 @@ pub enum Datum<'a> { TimestampNtz(TimestampNtz), #[display("{0}")] TimestampLtz(TimestampLtz), + #[display("{0:?}")] + Row(Box>), } impl Datum<'_> { @@ -123,6 +126,13 @@ impl Datum<'_> { _ => panic!("not a timestamp ltz: {self:?}"), } } + + pub fn as_row(&self) -> &GenericRow<'_> { + match self { + Self::Row(r) => r.as_ref(), + _ => panic!("not a row: {self:?}"), + } + } } // ----------- implement from @@ -742,6 +752,11 @@ impl Datum<'_> { message: "Builder type mismatch for TimestampLtz".to_string(), }); } + Datum::Row(_) => { + return Err(RowConvertError { + message: "append_to is not supported for Row type".to_string(), + }); + } } Err(RowConvertError { diff --git a/crates/fluss/src/row/encode/compacted_key_encoder.rs b/crates/fluss/src/row/encode/compacted_key_encoder.rs index d201450b..238def39 100644 --- a/crates/fluss/src/row/encode/compacted_key_encoder.rs +++ b/crates/fluss/src/row/encode/compacted_key_encoder.rs @@ -104,7 +104,7 @@ impl KeyEncoder for CompactedKeyEncoder { #[cfg(test)] mod tests { use super::*; - use crate::metadata::DataTypes; + use crate::metadata::{DataType, DataTypes}; use crate::row::{Datum, GenericRow}; pub fn for_test_row_type(row_type: &RowType) -> CompactedKeyEncoder { @@ -355,4 +355,50 @@ mod tests { encoded.iter().as_slice() ); } + + #[test] + fn test_row_as_primary_key() { + // ROW as a primary key column + let inner_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataTypes::string()], + vec!["x", "label"], + ); + let row_type = RowType::with_data_types_and_field_names( + vec![ + DataTypes::int(), + DataType::Row(inner_row_type.clone()), + ], + vec!["id", "nested"], + ); + + let mut inner = GenericRow::new(2); + inner.set_field(0, 42_i32); + inner.set_field(1, "hello"); + + let mut row = GenericRow::new(2); + row.set_field(0, 1_i32); + row.set_field(1, Datum::Row(Box::new(inner))); + + let mut encoder = for_test_row_type(&row_type); + let encoded = encoder.encode_key(&row).unwrap(); + + // Verify it encodes without error and produces non-empty bytes + assert!(!encoded.is_empty()); + + // Encode the same row again to verify determinism + let encoded2 = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded, encoded2); + + // Encode a different nested row and verify different output + let mut inner2 = GenericRow::new(2); + inner2.set_field(0, 99_i32); + inner2.set_field(1, "world"); + + let mut row2 = GenericRow::new(2); + row2.set_field(0, 1_i32); + row2.set_field(1, Datum::Row(Box::new(inner2))); + + let encoded3 = encoder.encode_key(&row2).unwrap(); + assert_ne!(encoded, encoded3); + } } diff --git a/crates/fluss/src/row/field_getter.rs b/crates/fluss/src/row/field_getter.rs index d6b9fc94..63404478 100644 --- a/crates/fluss/src/row/field_getter.rs +++ b/crates/fluss/src/row/field_getter.rs @@ -82,6 +82,7 @@ impl FieldGetter { pos, precision: t.precision(), }, + DataType::Row(_) => InnerFieldGetter::Row { pos }, _ => unimplemented!("DataType {:?} is currently unimplemented", data_type), }; @@ -149,6 +150,9 @@ pub enum InnerFieldGetter { pos: usize, precision: u32, }, + Row { + pos: usize, + }, } impl InnerFieldGetter { @@ -177,7 +181,8 @@ impl InnerFieldGetter { } InnerFieldGetter::TimestampLtz { pos, precision } => { Datum::TimestampLtz(row.get_timestamp_ltz(*pos, *precision)?) - } //TODO Array, Map, Row + } + InnerFieldGetter::Row { pos } => Datum::Row(Box::new(row.get_row(*pos)?.clone())), }) } @@ -198,7 +203,8 @@ impl InnerFieldGetter { | Self::Date { pos } | Self::Time { pos } | Self::Timestamp { pos, .. } - | Self::TimestampLtz { pos, .. } => *pos, + | Self::TimestampLtz { pos, .. } + | Self::Row { pos } => *pos, } } } diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs index ef99ba29..39048df1 100644 --- a/crates/fluss/src/row/mod.rs +++ b/crates/fluss/src/row/mod.rs @@ -29,6 +29,7 @@ mod row_decoder; use crate::client::WriteFormat; use bytes::Bytes; +use serde::Serialize; pub use column::*; pub use compacted::CompactedRow; pub use datum::*; @@ -119,13 +120,20 @@ pub trait InternalRow: Send + Sync { /// Returns the binary value at the given position fn get_bytes(&self, pos: usize) -> Result<&[u8]>; + /// Returns the nested row value at the given position + fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { + Err(crate::error::Error::IllegalArgument { + message: format!("get_row not supported at position {pos}"), + }) + } + /// Returns encoded bytes if already encoded fn as_encoded_bytes(&self, _write_format: WriteFormat) -> Option<&[u8]> { None } } -#[derive(Debug)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)] pub struct GenericRow<'a> { pub values: Vec>, } @@ -274,6 +282,15 @@ impl<'a> InternalRow for GenericRow<'a> { }), } } + + fn get_row(&self, pos: usize) -> Result<&GenericRow<'_>> { + match self.get_value(pos)? { + Datum::Row(r) => Ok(r.as_ref()), + other => Err(IllegalArgument { + message: format!("type mismatch at position {pos}: expected Row, got {other:?}"), + }), + } + } } impl<'a> GenericRow<'a> { diff --git a/thoughts/shared/plans/2026-03-15-row-nested-struct-support.md b/thoughts/shared/plans/2026-03-15-row-nested-struct-support.md new file mode 100644 index 00000000..abd82f3a --- /dev/null +++ b/thoughts/shared/plans/2026-03-15-row-nested-struct-support.md @@ -0,0 +1,564 @@ +# ROW (Nested Struct) Column End-to-End Support + +## Overview + +Add end-to-end serialization support for `DataType::Row(RowType)` columns. The schema layer (type definition, JSON serde, Arrow conversion) is already complete. What is missing is the entire row-serialization stack: `Datum` has no `Row` variant, `InternalRow` has no `get_row()`, `CompactedRowWriter`/`CompactedRowDeserializer` panic on `DataType::Row`, and `FieldGetter`/`ValueWriter` hit `unimplemented!()`. + +Wire format: identical to `String`/`Bytes` — a varint-length-prefixed blob where the blob is a complete compacted row (null-bit header + fields). No new binary primitives needed. + +**Java reference verified**: `CompactedRowWriter.java:339-346` — `writeRow` converts the inner row to bytes via `RowSerializer.toBinaryRow(value)` (which serializes in CompactedRow format) then calls `write(length, segments, offset)` which is `writeInt(len)` + raw bytes, identical to `writeBytes`. `CompactedRowReader.java:372-378` — `readRow` calls `readInt()` for the length, then creates a `CompactedRow` pointing into the buffer at the current position (zero-copy lazy decode). The Rust plan uses eager decode to `GenericRow` instead of lazy decode, which is simpler and correct. + +## Current State Analysis + +- `DataType::Row(RowType)`, `RowType { fields: Vec }` — fully defined at `metadata/datatype.rs:918` +- JSON serde and Arrow `Struct` conversion — already implemented +- `Datum` enum (`datum.rs:39-71`) — 15 scalar variants, no `Row` +- `InternalRow` trait (`mod.rs:61-126`) — 18 typed getters, no `get_row()` +- `CompactedRowDeserializer::deserialize` (`compacted_row_reader.rs:163-165`) — `_ => panic!(...)` +- `CompactedRowWriter` — no `write_row` method +- `FieldGetter::create` (`field_getter.rs:85`) — `_ => unimplemented!(...)` +- `InnerFieldGetter::get_field` (`field_getter.rs:180`) — `//TODO Array, Map, Row` +- `InnerValueWriter` (`binary_writer.rs:139`) — `// TODO Array, Row` +- `InnerValueWriter::create_inner_value_writer` (`binary_writer.rs:178`) — `_ => unimplemented!(...)` + +## Desired End State + +After this plan, the following must work: + +```rust +// Build and serialize a row containing a nested ROW +let inner_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataTypes::string()], + vec!["x", "label"], +); +let outer_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataType::Row(inner_row_type.clone())], + vec!["id", "nested"], +); + +let mut inner = GenericRow::new(2); +inner.set_field(0, 42_i32); +inner.set_field(1, "hello"); + +let mut outer = GenericRow::new(2); +outer.set_field(0, 1_i32); +outer.set_field(1, Datum::Row(Box::new(inner))); + +// Write +let mut writer = CompactedRowWriter::new(2); +// ... (via FieldGetter + ValueWriter) + +// Read back via CompactedRowDeserializer and assert values match +``` + +Tests must pass: `cargo test -p fluss` + +## Key Discoveries + +- `RowType` derives `Debug, Clone, PartialEq, Eq, Hash, Serialize` — safe to store in `InnerValueWriter::Row(RowType)` (`datatype.rs:917`) +- `CompactedRow` delegates all `InternalRow` getters through `decoded_row()` → `&GenericRow` (`compacted_row.rs:71-74`). Adding `get_row` to `CompactedRow` follows this same delegation pattern. +- `CompactedRowReader::read_bytes` at line 275 returns `(&'a [u8], usize)` — the slice lifetime `'a` ties to the underlying data buffer, so nested rows can be deserialized with matching lifetimes. +- `GenericRow` currently derives only `Debug`. Its field `values: Vec>` means all the needed traits (`Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize`) are derivable because `Vec` derives them when `T` does and `Datum` already implements all of them. +- `parse_display::Display` on `Datum` uses `#[display("...")]` per variant — adding `#[display("ROW({0:?})")]` on `Datum::Row` satisfies the Display derive requirement. +- `ColumnarRow::get_row` would require extracting a `StructArray` sub-row — this is non-trivial and is explicitly excluded from this plan. + +## What We're NOT Doing + +- Array or Map type support (same pattern, separate task) +- `ColumnarRow::get_row` — requires Arrow `StructArray` extraction, separate sub-task +- `Datum::append_to` for `Row` (Arrow `StructBuilder`) — not needed for the serialization path +- Extending the Java-compat hex test (`test_all_data_types_java_compatible`) in `CompactedKeyEncoder` for ROW (needs Java reference hex data from `encoded_key.hex`) +- Performance optimization of `InnerValueWriter::Row` (currently creates `InnerValueWriter` per-field per-call; a cache can be added later) + +## Key Encoder: No Code Changes Needed + +`CompactedKeyEncoder` (`compacted_key_encoder.rs`) uses `FieldGetter::create` + `CompactedKeyWriter::create_value_writer` (which delegates to `ValueWriter::create_value_writer`). `CompactedKeyWriter` implements `BinaryWriter` by delegating all methods (including `write_bytes`) to `CompactedRowWriter`. Once FieldGetter and ValueWriter support Row (Phase 2), the key encoder works automatically for ROW primary key columns — no additional code changes needed. A test is included in Phase 3. + +## Implementation Approach + +Work bottom-up: type system first, then wire up write/read, then FieldGetter/ValueWriter, then tests. Each phase compiles cleanly before proceeding. + +--- + +## Phase 1: Core Type System + +### Overview +Add `Datum::Row` variant and derive required traits on `GenericRow`. Add `get_row()` to `InternalRow` and implement it on `GenericRow` and `CompactedRow`. Leave `ColumnarRow` as `unimplemented!()`. + +### Changes Required + +#### 1. `GenericRow` — add derives +**File**: `crates/fluss/src/row/mod.rs` + +Add `Clone, PartialEq, Eq, PartialOrd, Ord, Hash` to `GenericRow`'s derive. `Serialize` is also needed for `Datum::Row`. + +```rust +// Before: +#[derive(Debug)] +pub struct GenericRow<'a> { + pub values: Vec>, +} + +// After: +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)] +pub struct GenericRow<'a> { + pub values: Vec>, +} +``` + +Note: `Serialize` is required because `Datum` derives `Serialize` (serde). Adding `Datum::Row(Box>)` means the serde derive on `Datum` requires `GenericRow<'a>: Serialize`. The recursive type (`Datum` → `GenericRow` → `Vec`) is handled correctly by serde through `Box`. Add `use serde::Serialize;` to the imports in `mod.rs` (serde is already a dependency — `datum.rs` imports it at line 33). + +#### 2. `Datum::Row` variant +**File**: `crates/fluss/src/row/datum.rs` + +```rust +// In the Datum enum, after TimestampLtz: +#[display("ROW({0:?})")] +Row(Box>), +``` + +Also add `use crate::row::GenericRow;` at the top of datum.rs (check if it's already imported via `mod.rs` re-exports). + +The `is_null()` method already uses `matches!(self, Datum::Null)` so no change needed there. + +Add an accessor on `Datum`: +```rust +pub fn as_row(&self) -> &GenericRow<'_> { + match self { + Self::Row(r) => r.as_ref(), + _ => panic!("not a row: {self:?}"), + } +} +``` + +#### 3. `InternalRow::get_row` — trait method +**File**: `crates/fluss/src/row/mod.rs` + +Add to the `InternalRow` trait after `get_bytes`: +```rust +/// Returns the nested row value at the given position. +fn get_row(&self, pos: usize) -> Result>; +``` + +#### 4. `GenericRow::get_row` implementation +**File**: `crates/fluss/src/row/mod.rs` + +Add to `impl<'a> InternalRow for GenericRow<'a>`: +```rust +fn get_row(&self, pos: usize) -> Result> { + match self.get_value(pos)? { + Datum::Row(r) => Ok(*r.clone()), + other => Err(IllegalArgument { + message: format!( + "type mismatch at position {pos}: expected Row, got {other:?}" + ), + }), + } +} +``` + +Note: This clones the inner `Box` — `*r.clone()` dereferences the cloned Box to get an owned `GenericRow`. Acceptable for correctness; optimize later if needed. + +#### 5. `CompactedRow::get_row` implementation +**File**: `crates/fluss/src/row/compacted/compacted_row.rs` + +Following the same delegation pattern as all other getters: +```rust +fn get_row(&self, pos: usize) -> Result> { + self.decoded_row().get_row(pos) +} +``` + +#### 6. `ColumnarRow::get_row` — stub +**File**: `crates/fluss/src/row/column.rs` + +```rust +fn get_row(&self, pos: usize) -> Result> { + unimplemented!("ColumnarRow::get_row is not yet implemented — requires Arrow StructArray extraction") +} +``` + +### Success Criteria + +#### Automated Verification: +- [x] `cargo build -p fluss` compiles with no errors +- [x] `cargo test -p fluss` — all existing tests pass (250 passed) + +--- + +## Phase 2: Serialization Stack + +### Overview +Wire up the write path (`CompactedRowWriter` + `InnerValueWriter`) and the read path (`CompactedRowDeserializer`), then hook up `FieldGetter` and `InnerValueWriter` to expose Row to the encoder pipeline. + +### Changes Required + +#### 1. `InnerValueWriter::Row` variant +**File**: `crates/fluss/src/row/binary/binary_writer.rs` + +```rust +// In InnerValueWriter enum, replacing the TODO comment: +Row(RowType), // schema needed to iterate fields during write +``` + +Remove or update the `// TODO Array, Row` comment. + +#### 2. `InnerValueWriter::create_inner_value_writer` — Row arm +**File**: `crates/fluss/src/row/binary/binary_writer.rs` + +```rust +// In the match, before the _ => unimplemented! arm: +DataType::Row(row_type) => Ok(InnerValueWriter::Row(row_type.clone())), +``` + +Add `use crate::metadata::RowType;` if not already imported. + +#### 3. `InnerValueWriter::write_value` — Row arm +**File**: `crates/fluss/src/row/binary/binary_writer.rs` + +`write_value` is generic `W: BinaryWriter`. The `BinaryWriter` trait has `write_bytes`, so we create a nested `CompactedRowWriter` (always, regardless of outer writer type), serialize the inner row into it, then call `writer.write_bytes(nested.buffer())`. This mirrors Java's `writeRow`: `RowSerializer.toBinaryRow(value)` produces compacted-format bytes, then `write(length, segments, offset)` = `writeInt(len)` + raw bytes = `writeBytes`. + +```rust +// In write_value, before the _ => Err(...) arm: +(InnerValueWriter::Row(row_type), Datum::Row(inner_row)) => { + use crate::row::compacted::compacted_row_writer::CompactedRowWriter; + let field_count = row_type.fields().len(); + let mut nested = CompactedRowWriter::new(field_count); + for (i, field) in row_type.fields().iter().enumerate() { + let datum = &inner_row.values[i]; + if datum.is_null() { + if field.data_type.is_nullable() { + nested.set_null_at(i); + } + } else { + let vw = InnerValueWriter::create_inner_value_writer(&field.data_type, None) + .expect("create_inner_value_writer failed for nested row field"); + vw.write_value(&mut nested, i, datum) + .expect("write_value failed for nested row field"); + } + } + writer.write_bytes(nested.buffer()); +} +``` + +Note: `nested.buffer()` = `&self.buffer[..self.position]` = the full compacted row (null-bit header + field bytes). `BinaryWriter::write_bytes` prepends a varint length — this is the complete wire encoding. + +#### 4. `CompactedRowDeserializer::deserialize` — Row arm +**File**: `crates/fluss/src/row/compacted/compacted_row_reader.rs` + +Mirrors Java's `readRow`: `readInt()` (varint length) → slice the buffer → create `CompactedRow.pointTo(...)`. The Rust equivalent uses eager decode instead of lazy `pointTo`. Add before the `_ => panic!(...)` arm: + +```rust +DataType::Row(row_type) => { + // read_bytes returns (&'a [u8], next_cursor) — varint len + raw slice + let (nested_bytes, next) = reader.read_bytes(cursor); + let nested_reader = CompactedRowReader::new( + row_type.fields().len(), + nested_bytes, + 0, + nested_bytes.len(), + ); + // new_from_owned avoids borrowing the local row_type reference across the loop + let nested_deser = CompactedRowDeserializer::new_from_owned(row_type.clone()); + let nested_row = nested_deser.deserialize(&nested_reader); + (Datum::Row(Box::new(nested_row)), next) +} +``` + +Lifetime note: `nested_bytes: &'a [u8]` borrows from the outer buffer (same lifetime `'a`). The returned `GenericRow<'a>` may contain `Cow::Borrowed` string/bytes datums that also borrow from `'a`. The local `nested_deser` is dropped after the block; this is fine because `GenericRow` borrows only from the reader's data buffer, not from the deserializer. + +#### 5. `FieldGetter::create` — Row arm +**File**: `crates/fluss/src/row/field_getter.rs` + +```rust +// In FieldGetter::create match, before the _ => unimplemented!(...) arm: +DataType::Row(_) => InnerFieldGetter::Row { pos }, +``` + +#### 6. `InnerFieldGetter` — Row variant +**File**: `crates/fluss/src/row/field_getter.rs` + +```rust +// In InnerFieldGetter enum, after TimestampLtz: +Row { + pos: usize, +}, +``` + +#### 7. `InnerFieldGetter::get_field` — Row arm +**File**: `crates/fluss/src/row/field_getter.rs` + +```rust +// In get_field match, replacing the //TODO Array, Map, Row comment: +InnerFieldGetter::Row { pos } => { + Datum::Row(Box::new(row.get_row(*pos)?)) +} +``` + +Remove or update the `//TODO Array, Map, Row` comment. + +#### 8. `InnerFieldGetter::pos` — Row arm +**File**: `crates/fluss/src/row/field_getter.rs` + +Add `Row { pos }` to the `pos()` method's catch-all pattern: + +```rust +// In pos() match: +| Self::TimestampLtz { pos, .. } +| Self::Row { pos } => *pos, +``` + +### Success Criteria + +#### Automated Verification: +- [x] `cargo build -p fluss` compiles with no errors +- [x] `cargo test -p fluss` — all existing tests pass + +--- + +## Phase 3: Tests + +### Overview +Add unit tests covering simple nesting, deep nesting, and nullable nested fields. All tests exercise the full round-trip: `GenericRow` → `CompactedRowWriter` (via `ValueWriter`/`FieldGetter`) → bytes → `CompactedRowDeserializer` → `GenericRow`. + +### Changes Required + +#### 1. Round-trip tests for ROW type +**File**: `crates/fluss/src/row/compacted/compacted_row_reader.rs` (in the existing `#[cfg(test)]` module, or add a new test module at the bottom) + +```rust +#[cfg(test)] +mod row_type_tests { + use crate::metadata::{DataTypes, DataType, RowType}; + use crate::row::{Datum, GenericRow}; + use crate::row::binary::{BinaryWriter, ValueWriter}; + use crate::row::compacted::compacted_row_writer::CompactedRowWriter; + use crate::row::compacted::compacted_row_reader::{ + CompactedRowDeserializer, CompactedRowReader, + }; + use crate::row::field_getter::FieldGetter; + + fn round_trip(outer_row_type: &RowType, outer_row: &GenericRow) -> GenericRow { + // Write + let field_getters = FieldGetter::create_field_getters(outer_row_type); + let value_writers: Vec = outer_row_type + .fields() + .iter() + .map(|f| ValueWriter::create_value_writer(f.data_type(), None).unwrap()) + .collect(); + let mut writer = CompactedRowWriter::new(outer_row_type.fields().len()); + for (i, (getter, vw)) in field_getters.iter().zip(value_writers.iter()).enumerate() { + let datum = getter.get_field(outer_row as &dyn crate::row::InternalRow).unwrap(); + vw.write_value(&mut writer, i, &datum).unwrap(); + } + let bytes = writer.to_bytes(); + + // Read + let deser = CompactedRowDeserializer::new(outer_row_type); + let reader = CompactedRowReader::new( + outer_row_type.fields().len(), + bytes.as_ref(), + 0, + bytes.len(), + ); + deser.deserialize(&reader) + } + + #[test] + fn test_row_simple_nesting() { + // ROW nested inside an outer row + let inner_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataTypes::string()], + vec!["x", "label"], + ); + let outer_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataType::Row(inner_row_type.clone())], + vec!["id", "nested"], + ); + + let mut inner = GenericRow::new(2); + inner.set_field(0, 42_i32); + inner.set_field(1, "hello"); + + let mut outer = GenericRow::new(2); + outer.set_field(0, 1_i32); + outer.set_field(1, Datum::Row(Box::new(inner))); + + let result = round_trip(&outer_row_type, &outer); + + assert_eq!(result.get_int(0).unwrap(), 1); + let nested = result.get_row(1).unwrap(); + assert_eq!(nested.get_int(0).unwrap(), 42); + assert_eq!(nested.get_string(1).unwrap(), "hello"); + } + + #[test] + fn test_row_deep_nesting() { + // ROW> — two levels of nesting + let inner_inner_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int()], + vec!["n"], + ); + let inner_row_type = RowType::with_data_types_and_field_names( + vec![DataType::Row(inner_inner_row_type.clone())], + vec!["inner"], + ); + let outer_row_type = RowType::with_data_types_and_field_names( + vec![DataType::Row(inner_row_type.clone())], + vec!["outer"], + ); + + let mut innermost = GenericRow::new(1); + innermost.set_field(0, 99_i32); + + let mut middle = GenericRow::new(1); + middle.set_field(0, Datum::Row(Box::new(innermost))); + + let mut outer = GenericRow::new(1); + outer.set_field(0, Datum::Row(Box::new(middle))); + + let result = round_trip(&outer_row_type, &outer); + + let mid = result.get_row(0).unwrap(); + let inner = mid.get_row(0).unwrap(); + assert_eq!(inner.get_int(0).unwrap(), 99); + } + + #[test] + fn test_row_with_nullable_fields() { + // Outer nullable ROW column; nested row with a nullable STRING field set to null + // DataTypes::string() and RowType::with_data_types_and_field_names both default to nullable=true + let inner_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataTypes::string()], + vec!["id", "optional_name"], + ); + let outer_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataType::Row(inner_row_type.clone())], + vec!["k", "nested"], + ); + + // Case 1: non-null nested row with a null field inside + let mut inner = GenericRow::new(2); + inner.set_field(0, 7_i32); + inner.set_field(1, Datum::Null); + + let mut outer = GenericRow::new(2); + outer.set_field(0, 10_i32); + outer.set_field(1, Datum::Row(Box::new(inner))); + + let result = round_trip(&outer_row_type, &outer); + assert_eq!(result.get_int(0).unwrap(), 10); + let nested = result.get_row(1).unwrap(); + assert_eq!(nested.get_int(0).unwrap(), 7); + assert!(nested.is_null_at(1).unwrap()); + + // Case 2: outer ROW column is null + let mut outer_null = GenericRow::new(2); + outer_null.set_field(0, 20_i32); + outer_null.set_field(1, Datum::Null); + + let result2 = round_trip(&outer_row_type, &outer_null); + assert_eq!(result2.get_int(0).unwrap(), 20); + assert!(result2.is_null_at(1).unwrap()); + } +} +``` + +Note: `DataTypes::string()` creates `StringType::with_nullable(true)` — nullable by default. `RowType::with_data_types_and_field_names(...)` creates `RowType::with_nullable(true, fields)` — also nullable by default. To make non-nullable types, use the inner type constructors: `StringType::with_nullable(false)`, `RowType::with_nullable(false, fields)`. The tests above use the default nullable types, which is correct for testing null handling. + +#### 2. Key encoder test for ROW +**File**: `crates/fluss/src/row/encode/compacted_key_encoder.rs` (add to the existing `#[cfg(test)] mod tests`) + +This verifies that `CompactedKeyEncoder` works with ROW as a primary key column. No code changes to the key encoder itself — this exercises the FieldGetter + ValueWriter Row support through the key encoder pipeline. + +```rust +#[test] +fn test_row_as_primary_key() { + // ROW as a primary key column + let inner_row_type = RowType::with_data_types_and_field_names( + vec![DataTypes::int(), DataTypes::string()], + vec!["x", "label"], + ); + let row_type = RowType::with_data_types_and_field_names( + vec![ + DataTypes::int(), + DataType::Row(inner_row_type.clone()), + ], + vec!["id", "nested"], + ); + + let mut inner = GenericRow::new(2); + inner.set_field(0, 42_i32); + inner.set_field(1, "hello"); + + let mut row = GenericRow::new(2); + row.set_field(0, 1_i32); + row.set_field(1, Datum::Row(Box::new(inner))); + + let mut encoder = for_test_row_type(&row_type); + let encoded = encoder.encode_key(&row).unwrap(); + + // Verify it encodes without error and produces non-empty bytes + assert!(!encoded.is_empty()); + + // Encode the same row again to verify determinism + let encoded2 = encoder.encode_key(&row).unwrap(); + assert_eq!(encoded, encoded2); + + // Encode a different nested row and verify different output + let mut inner2 = GenericRow::new(2); + inner2.set_field(0, 99_i32); + inner2.set_field(1, "world"); + + let mut row2 = GenericRow::new(2); + row2.set_field(0, 1_i32); + row2.set_field(1, Datum::Row(Box::new(inner2))); + + let encoded3 = encoder.encode_key(&row2).unwrap(); + assert_ne!(encoded, encoded3); +} +``` + +### Success Criteria + +#### Automated Verification: +- [x] `cargo test -p fluss` — all tests pass including the new row tests +- [x] `cargo test -p fluss row_type_tests` — row round-trip tests pass +- [x] `cargo test -p fluss test_row_as_primary_key` — key encoder test passes +- [x] `cargo clippy -p fluss -- -D warnings` — no new warnings + +#### Manual Verification: +- [ ] Run tests and confirm the nested row values round-trip correctly at each nesting level +- [ ] Confirm that the `DataType::Row` arm in `CompactedRowDeserializer` no longer panics + +--- + +## Testing Strategy + +### Unit Tests +- Simple `ROW`: single level, non-null fields +- Deep `ROW>`: two levels, verifies recursion in both write and read +- Nullable outer ROW: verifies null-bit handling for the column-level nullable flag +- Nullable inner field: nested row has a nullable field set to Datum::Null +- ROW as primary key: verifies the key encoder pipeline works with Row via FieldGetter + ValueWriter + +### Not Tested Here (separate tasks) +- `ROW, MAP>` — blocked on Array/Map support +- `ColumnarRow::get_row` — blocked on StructArray extraction +- Java-compat hex test for ROW in `test_all_data_types_java_compatible` — needs Java reference hex data + +--- + +## Migration Notes + +No migration needed — this is additive. No existing wire-format bytes change. + +--- + +## References + +- Research document: `thoughts/shared/research/2026-03-15-row-nested-struct-support.md` +- Java reference — write: `CompactedRowWriter.java:339-346` +- Java reference — read: `CompactedRowReader.java:370-378` +- Wire format template: `compacted_row_writer.rs:115-130` (`write_bytes`/`write_string`) +- Read template: `compacted_row_reader.rs:275-287` (`read_bytes`/`read_string`) diff --git a/thoughts/shared/pr_description.md b/thoughts/shared/pr_description.md new file mode 100644 index 00000000..212e3505 --- /dev/null +++ b/thoughts/shared/pr_description.md @@ -0,0 +1,31 @@ +# PR Description Template + +Use this template to fill out PR descriptions. + +## What problem does this PR solve? + + + +## What changes were made? + + + +## Why this approach? + + + +## Breaking changes / migration notes + + + +## Changelog entry + + + +## How to verify it + + + +- [ ] `cargo test -p fluss-rs` — all tests pass +- [ ] `cargo clippy -p fluss-rs -- -D warnings` — no new warnings +- [ ] Manual: review the diff for correctness against the linked plan/research doc diff --git a/thoughts/shared/research/2026-03-15-row-nested-struct-support.md b/thoughts/shared/research/2026-03-15-row-nested-struct-support.md new file mode 100644 index 00000000..199cda6c --- /dev/null +++ b/thoughts/shared/research/2026-03-15-row-nested-struct-support.md @@ -0,0 +1,337 @@ +--- +date: 2026-03-15T11:29:18Z +researcher: hemanth +git_commit: 7d4bfd663be7d3edf527ffcba56d5c370c67cf20 +branch: main +repository: hemanthsavasere/fluss-rust +topic: "ROW (nested struct) column end-to-end support" +tags: [research, codebase, row-type, datum, compacted-row, internal-row, field-getter, value-writer, key-encoder] +status: complete +last_updated: 2026-03-15 +last_updated_by: hemanth +--- + +# Research: ROW (Nested Struct) Column End-to-End Support + +**Date**: 2026-03-15T11:29:18Z +**Researcher**: hemanth +**Git Commit**: [7d4bfd6](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20) +**Branch**: main +**Repository**: hemanthsavasere/fluss-rust + +--- + +## Research Question + +Add end-to-end support for ROW (nested struct) columns. A nested Row is just a recursively-embedded CompactedRow — the same format as a top-level row, length-prefixed. The `Datum` enum needs a `Row` variant, `InternalRow` needs `get_row()`. Java references: `CompactedRowWriter.java` lines 339–346 and `CompactedRowReader.java` lines 370–378 — it's serialize-to-bytes then `write_bytes`, same as strings. The value writer, field getter, and key encoder need the same additions as the other two types. + +--- + +## Summary + +`DataType::Row(RowType)` is already fully modeled at the schema level (type definition, JSON serde, Arrow type conversion). **Nothing in that layer needs changing.** What is missing is the entire row-serialization stack: `Datum` has no `Row` variant, `InternalRow` has no `get_row()`, `CompactedRowWriter`/`CompactedRowDeserializer` panic on `DataType::Row`, and `FieldGetter`/`ValueWriter` hit `unimplemented!()`. + +The implementation pattern is the same as `String`/`Bytes` in the writer: serialize the nested row to bytes with a recursive `CompactedRowWriter`, then store with `write_bytes()` (varint length prefix + raw bytes). On the read side, `read_bytes()` returns the raw slice, which is recursively wrapped in a `CompactedRow`. No new binary format is needed. + +Array and Map are in the exact same situation — none of the serialization layers implement them either. All three complex types share the same set of `TODO` comments, `unimplemented!()` guards, and `_ => panic!` arms. + +--- + +## Detailed Findings + +### What Is Already Implemented (No Changes Needed) + +| Component | File | Status | +|---|---|---| +| `DataType::Row(RowType)` enum variant | [`metadata/datatype.rs:26-46`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/metadata/datatype.rs#L26-L46) | ✅ Fully defined | +| `RowType` struct with `fields: Vec` | [`metadata/datatype.rs:918`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/metadata/datatype.rs#L918) | ✅ Fully defined | +| `is_nullable` / `as_non_nullable` / `Display` for Row | [`metadata/datatype.rs:49-119`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/metadata/datatype.rs#L49-L119) | ✅ Dispatched | +| JSON serde for `Row` | [`metadata/json_serde.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/metadata/json_serde.rs) | ✅ Serialize + deserialize | +| Arrow type conversion (`Row` → `ArrowDataType::Struct`) | [`record/arrow.rs:1054-1076`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/record/arrow.rs#L1054-L1076) | ✅ Implemented | + +### What Needs to Be Implemented + +#### 1. `Datum` Enum — `row/datum.rs:39-71` + +**Current state**: 15 variants covering all scalar types. No `Array`, `Map`, or `Row` variant. The TODO is implicit — the comment `//TODO Array, Map, Row` appears in `field_getter.rs:180` and `binary_writer.rs:139`. + +```rust +// Current variants (datum.rs:39-71): +pub enum Datum<'a> { + Null, Bool(bool), Int8(i8), Int16(i16), Int32(i32), Int64(i64), + Float32(F32), Float64(F64), + String(Str<'a>), // Cow<'a, str> — zero-copy borrow from buffer + Blob(Blob<'a>), // Cow<'a, [u8]> — zero-copy borrow from buffer + Decimal(Decimal), Date(Date), Time(Time), + TimestampNtz(TimestampNtz), TimestampLtz(TimestampLtz), + // No Row variant yet +} +``` + +Need to add a `Row` variant carrying a nested row. The `String`/`Blob` variants use `Cow<'a, …>` for zero-copy borrowing — the Row variant will need a similar lifetime or boxing strategy. + +#### 2. `InternalRow` Trait — `row/mod.rs:61-126` + +**Current state**: 18 typed `get_*` methods (get_boolean through get_bytes). No `get_row()`. + +```rust +// Current trait (mod.rs:61-126) — abridged: +pub trait InternalRow: Send + Sync { + fn get_field_count(&self) -> usize; + fn is_null_at(&self, pos: usize) -> Result; + fn get_boolean(&self, pos: usize) -> Result; + // ... 15 more typed getters ... + fn get_bytes(&self, pos: usize) -> Result<&[u8]>; + // No get_row yet +} +``` + +Need to add: `fn get_row(&self, pos: usize) -> Result<...>` with an appropriate return type (boxed trait object or concrete type). Must be implemented on `GenericRow`, `CompactedRow`, and `ColumnarRow`. + +#### 3. `CompactedRowWriter` + `BinaryWriter` Trait + +**Current state**: `BinaryWriter` trait has TODO stubs for `write_array` and `write_row` commented out at [`binary_writer.rs:70-74`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/binary/binary_writer.rs#L70-L74). `CompactedRowWriter` has no `write_row` method. + +The reference pattern in the same writer is `write_bytes` / `write_string` ([`compacted_row_writer.rs:115-130`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/compacted/compacted_row_writer.rs#L115-L130)): + +```rust +// write_bytes: varint(len) + raw bytes +fn write_bytes(&mut self, value: &[u8]) { + self.write_int(value.len() as i32); + self.write_raw(value); +} + +// write_string: delegates to write_bytes +fn write_string(&mut self, value: &str) { + self.write_bytes(value.as_ref()); +} +``` + +Java's `CompactedRowWriter.java:339-346` does the same for nested rows: serialize the nested row to bytes, then call `writeBytes`. The Rust implementation should: + +```rust +fn write_row(&mut self, value: &dyn InternalRow, row_type: &RowType) { + let mut nested_writer = CompactedRowWriter::new(row_type.fields().len()); + // ... encode each field of value into nested_writer ... + let bytes = nested_writer.to_bytes(); + self.write_bytes(&bytes); +} +``` + +#### 4. `CompactedRowDeserializer` — `row/compacted/compacted_row_reader.rs:52-171` + +**Current state**: The `deserialize` method's `DataType` match falls through to `panic!` for all unsupported types at [`compacted_row_reader.rs:163-165`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/compacted/compacted_row_reader.rs#L163-L165): + +```rust +_ => { + panic!("Unsupported DataType in CompactedRowDeserializer: {dtype:?}"); +} +``` + +The reference pattern is `read_bytes` / `read_string` ([`compacted_row_reader.rs:275-287`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/compacted/compacted_row_reader.rs#L275-L287)): + +```rust +// read_bytes: returns (&[u8], next_cursor) +pub fn read_bytes(&self, pos: usize) -> (&[u8], usize) { + let (len, data_pos) = self.read_int(pos); + (&self.segment[data_pos..data_pos + len as usize], data_pos + len as usize) +} + +pub fn read_string(&self, pos: usize) -> (&str, usize) { + let (bytes, next) = self.read_bytes(pos); + (std::str::from_utf8(bytes).unwrap(), next) +} +``` + +Java's `CompactedRowReader.java:370-378` does the same for nested rows: call `readBytes`, then deserialize the result. The Rust implementation should: + +```rust +DataType::Row(row_type) => { + let (nested_bytes, next_cursor) = reader.read_bytes(cursor); + let nested_row = CompactedRow::from_bytes(row_type, nested_bytes); + row.set_field(i, Datum::Row(Box::new(nested_row))); + cursor = next_cursor; +} +``` + +#### 5. `FieldGetter` + `InnerFieldGetter` — `row/field_getter.rs` + +**Current state**: `InnerFieldGetter` has 16 variants ([`field_getter.rs:97-152`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/field_getter.rs#L97-L152)). `create()` hits `unimplemented!()` for unknown types ([`field_getter.rs:85`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/field_getter.rs#L85)). `get_field` has `//TODO Array, Map, Row` at [`field_getter.rs:180`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/field_getter.rs#L180). + +All existing variants follow the same pattern: +```rust +// Example: InnerFieldGetter::String { pos } +InnerFieldGetter::String { pos } => { + Datum::String(Str::from(row.get_string(*pos)?)) +} +``` + +Need to add: +```rust +// In InnerFieldGetter enum: +Row { pos }, // (RowType not needed here since get_row returns the whole row) + +// In create(): +DataType::Row(_) => InnerFieldGetter::Row { pos }, + +// In get_field(): +InnerFieldGetter::Row { pos } => { + Datum::Row(row.get_row(*pos)?) +} +``` + +#### 6. `InnerValueWriter` + `create_inner_value_writer` + `write_value` — `row/binary/binary_writer.rs` + +**Current state**: `InnerValueWriter` has 16 variants ([`binary_writer.rs:122-140`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/binary/binary_writer.rs#L122-L140)) with `// TODO Array, Row` at line 139. `create_inner_value_writer` hits `unimplemented!()` at [`binary_writer.rs:178`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/binary/binary_writer.rs#L178). `write_value` hits `Err(IllegalArgument)` for mismatched pairs. + +The existing variant pattern (Decimal stores type params): +```rust +// In InnerValueWriter enum: +Decimal(u32, u32), // precision, scale +TimestampNtz(u32), // precision +``` + +For Row, the `RowType` is needed to recursively encode fields: +```rust +// In InnerValueWriter enum: +Row(RowType), // schema of the nested row + +// In create_inner_value_writer(): +DataType::Row(row_type) => InnerValueWriter::Row(row_type.clone()), + +// In write_value(): +(InnerValueWriter::Row(row_type), Datum::Row(inner_row)) => { + writer.write_row(inner_row.as_ref(), row_type)?; +} +``` + +#### 7. `CompactedKeyEncoder` — `row/encode/compacted_key_encoder.rs:266-268` + +**Current state**: The Java-compatibility test has explicit TODO comments: +```rust +// TODO: Add support for ARRAY type +// TODO: Add support for MAP type +// TODO: Add support for ROW type +``` + +ROW as a primary key column would flow through `FieldGetter` and `ValueWriter` just like any other type. Once steps 5 and 6 above are done, `CompactedKeyEncoder` should work for Row automatically (since it constructs its `FieldGetter`s and `ValueWriter`s by calling `FieldGetter::create` and `ValueWriter::create_value_writer`). The test just needs the TODO comments replaced with actual test cases. + +--- + +## Wire Format Documentation + +The nested-row wire format follows the same length-prefixed bytes pattern as `String` and `Bytes`: + +``` +[ varint(nested_row_byte_length) ][ nested_row_bytes ... ] +``` + +Where `nested_row_bytes` is a complete compacted row: +``` +[ null-bit-header: ceil(field_count/8) bytes ][ field_0 ][ field_1 ] ... [ field_N ] +``` + +This is recursive — a `Row>` stores: +``` +varint(outer_len) [ + null-bits(1 byte) + varint(inner_len) [ + null-bits(1 byte) + varint(int_value) + ] +] +``` + +No new binary primitives are needed. The existing `write_bytes`/`read_bytes` plus recursive `CompactedRowWriter`/`CompactedRowDeserializer` cover everything. + +--- + +## Code References + +| File | Line(s) | Description | +|---|---|---| +| [`row/datum.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/datum.rs#L39-L71) | 39–71 | `Datum` enum — add `Row` variant here | +| [`row/mod.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/mod.rs#L61-L126) | 61–126 | `InternalRow` trait — add `get_row()` here | +| [`row/compacted/compacted_row_writer.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/compacted/compacted_row_writer.rs#L115-L130) | 115–130 | `write_bytes`/`write_string` — reference pattern for `write_row` | +| [`row/compacted/compacted_row_reader.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/compacted/compacted_row_reader.rs#L163-L165) | 163–165 | `panic!` arm in deserializer — replace with `DataType::Row` arm | +| [`row/compacted/compacted_row_reader.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/compacted/compacted_row_reader.rs#L275-L287) | 275–287 | `read_bytes`/`read_string` — reference pattern for reading nested rows | +| [`row/field_getter.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/field_getter.rs#L85) | 85 | `unimplemented!()` in `create()` — add `DataType::Row` arm | +| [`row/field_getter.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/field_getter.rs#L97-L180) | 97–180 | `InnerFieldGetter` enum + `get_field` dispatch — add `Row` variant | +| [`row/binary/binary_writer.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/binary/binary_writer.rs#L70-L74) | 70–74 | `BinaryWriter` trait — uncomment/add `write_row` method | +| [`row/binary/binary_writer.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/binary/binary_writer.rs#L122-L140) | 122–140 | `InnerValueWriter` enum — add `Row(RowType)` variant | +| [`row/binary/binary_writer.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/binary/binary_writer.rs#L145-L181) | 145–181 | `create_inner_value_writer` — add `DataType::Row` arm | +| [`row/binary/binary_writer.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/binary/binary_writer.rs#L184-L247) | 184–247 | `write_value` dispatch — add `(Row(t), Datum::Row(r))` arm | +| [`row/encode/compacted_key_encoder.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/encode/compacted_key_encoder.rs#L266-L268) | 266–268 | TODO comments for Array/Map/Row in Java-compat test | +| [`metadata/datatype.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/metadata/datatype.rs#L918) | 918 | `RowType` struct — already fully defined, no changes needed | +| [`record/arrow.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/record/arrow.rs#L1054-L1076) | 1054–1076 | Arrow `Struct` conversion for Row — already implemented | + +--- + +## Architecture Documentation + +### Existing Patterns That ROW Must Follow + +**String/Bytes in `CompactedRowWriter`** (the write-side template): +1. `write_bytes(value)` = `write_int(len)` + `write_raw(bytes)` (`compacted_row_writer.rs:115-120`) +2. `write_string(value)` = `write_bytes(value.as_ref())` (`compacted_row_writer.rs:128-130`) +3. ROW follows: `write_row(row, type)` = serialize row → bytes, then `write_bytes(bytes)` + +**String/Bytes in `CompactedRowDeserializer`** (the read-side template): +1. `read_bytes(pos)` = `read_int(pos)` → len, return `(segment[pos..pos+len], pos+len)` (`compacted_row_reader.rs:275-281`) +2. `read_string(pos)` = `read_bytes(pos)` + `from_utf8` (`compacted_row_reader.rs:283-287`) +3. ROW follows: `read_bytes(pos)` → raw slice, then `CompactedRow::from_bytes(row_type, slice)` + +**`Decimal(precision, scale)` in `InnerValueWriter`** (template for type params): +- The variant carries type parameters needed at write time +- `InnerValueWriter::Row(RowType)` follows the same pattern — RowType carries the field schema + +**`TimestampNtz(precision)` / `TimestampLtz(precision)` in `InnerFieldGetter`** (template for type params in FieldGetter): +- The variant carries read parameters needed at get time +- `InnerFieldGetter::Row { pos }` is simpler — no extra params needed beyond `pos` since `get_row` returns the whole row + +### Data Flow for ROW Fields (once implemented) + +**Write path**: +``` +GenericRow { field: Datum::Row(inner_row) } + → FieldGetter::Row { pos }.get_field(outer_row) + → Datum::Row(inner_row) + → InnerValueWriter::Row(row_type).write_value(writer, pos, datum) + → CompactedRowWriter::write_row(inner_row, row_type) + → nested CompactedRowWriter serializes inner_row + → write_bytes(nested_bytes) // varint(len) + raw +``` + +**Read path**: +``` +raw bytes from wire + → CompactedRowDeserializer: DataType::Row(row_type) arm + → reader.read_bytes(cursor) // read varint(len) + raw slice + → CompactedRow::from_bytes(row_type, slice) + → store as Datum::Row(nested_compacted_row) + → cached in GenericRow via OnceLock + → outer row.get_row(pos) returns the nested CompactedRow +``` + +--- + +## Planned Tests + +Per the original task description: + +1. **Simple nested row** — `ROW`, single level of nesting +2. **Deeply nested** — `ROW>`, recursive nesting +3. **Nullable fields** — nested row containing nullable fields; outer nullable ROW column +4. **Combination** (separate issue) — `ROW, MAP>` once Array and Map also land + +--- + +## Open Questions + +1. **Return type of `get_row`** — Should it return `Box`, `&dyn InternalRow`, or a concrete type like `CompactedRow`? The lifetime constraints from `GenericRow<'a>` (which stores `Datum<'a>`) may require a lifetime on the return type. + +2. **`Datum::Row` inner type** — `Box>` works but loses the zero-copy property that `Cow<'a, …>` provides for String/Blob. Alternatively, storing the raw bytes in `Datum::Row(Blob<'a>)` and lazily deserializing preserves zero-copy but complicates access patterns. + +3. **`write_row` signature in `BinaryWriter` trait** — Needs `&RowType` to know how many fields and their types for serializing. Whether the encoder carries the `RowType` in `InnerValueWriter::Row(RowType)` or passes it at call time affects the API. + +4. **`ColumnarRow::get_row`** — Extracting a struct sub-row from an Arrow `StructArray` is more complex than the other implementations. This may need its own sub-task. From 13a75d6dc7148444dba95e33d879102652430ad7 Mon Sep 17 00:00:00 2001 From: hemanthsavasere Date: Sun, 15 Mar 2026 18:46:55 +0000 Subject: [PATCH 2/3] chore: remove thoughts/ from tracking and add to .gitignore Co-Authored-By: Claude Sonnet 4.6 --- .gitignore | 4 +- .../2026-03-15-row-nested-struct-support.md | 564 ------------------ thoughts/shared/pr_description.md | 31 - .../2026-03-15-row-nested-struct-support.md | 337 ----------- 4 files changed, 3 insertions(+), 933 deletions(-) delete mode 100644 thoughts/shared/plans/2026-03-15-row-nested-struct-support.md delete mode 100644 thoughts/shared/pr_description.md delete mode 100644 thoughts/shared/research/2026-03-15-row-nested-struct-support.md diff --git a/.gitignore b/.gitignore index 5d11a1c0..f16dcbaa 100644 --- a/.gitignore +++ b/.gitignore @@ -53,4 +53,6 @@ website/package-lock.json website/versioned_docs website/versioned_sidebars website/versions.json -website/pnpm-lock.yaml \ No newline at end of file +website/pnpm-lock.yaml + +thoughts/ \ No newline at end of file diff --git a/thoughts/shared/plans/2026-03-15-row-nested-struct-support.md b/thoughts/shared/plans/2026-03-15-row-nested-struct-support.md deleted file mode 100644 index abd82f3a..00000000 --- a/thoughts/shared/plans/2026-03-15-row-nested-struct-support.md +++ /dev/null @@ -1,564 +0,0 @@ -# ROW (Nested Struct) Column End-to-End Support - -## Overview - -Add end-to-end serialization support for `DataType::Row(RowType)` columns. The schema layer (type definition, JSON serde, Arrow conversion) is already complete. What is missing is the entire row-serialization stack: `Datum` has no `Row` variant, `InternalRow` has no `get_row()`, `CompactedRowWriter`/`CompactedRowDeserializer` panic on `DataType::Row`, and `FieldGetter`/`ValueWriter` hit `unimplemented!()`. - -Wire format: identical to `String`/`Bytes` — a varint-length-prefixed blob where the blob is a complete compacted row (null-bit header + fields). No new binary primitives needed. - -**Java reference verified**: `CompactedRowWriter.java:339-346` — `writeRow` converts the inner row to bytes via `RowSerializer.toBinaryRow(value)` (which serializes in CompactedRow format) then calls `write(length, segments, offset)` which is `writeInt(len)` + raw bytes, identical to `writeBytes`. `CompactedRowReader.java:372-378` — `readRow` calls `readInt()` for the length, then creates a `CompactedRow` pointing into the buffer at the current position (zero-copy lazy decode). The Rust plan uses eager decode to `GenericRow` instead of lazy decode, which is simpler and correct. - -## Current State Analysis - -- `DataType::Row(RowType)`, `RowType { fields: Vec }` — fully defined at `metadata/datatype.rs:918` -- JSON serde and Arrow `Struct` conversion — already implemented -- `Datum` enum (`datum.rs:39-71`) — 15 scalar variants, no `Row` -- `InternalRow` trait (`mod.rs:61-126`) — 18 typed getters, no `get_row()` -- `CompactedRowDeserializer::deserialize` (`compacted_row_reader.rs:163-165`) — `_ => panic!(...)` -- `CompactedRowWriter` — no `write_row` method -- `FieldGetter::create` (`field_getter.rs:85`) — `_ => unimplemented!(...)` -- `InnerFieldGetter::get_field` (`field_getter.rs:180`) — `//TODO Array, Map, Row` -- `InnerValueWriter` (`binary_writer.rs:139`) — `// TODO Array, Row` -- `InnerValueWriter::create_inner_value_writer` (`binary_writer.rs:178`) — `_ => unimplemented!(...)` - -## Desired End State - -After this plan, the following must work: - -```rust -// Build and serialize a row containing a nested ROW -let inner_row_type = RowType::with_data_types_and_field_names( - vec![DataTypes::int(), DataTypes::string()], - vec!["x", "label"], -); -let outer_row_type = RowType::with_data_types_and_field_names( - vec![DataTypes::int(), DataType::Row(inner_row_type.clone())], - vec!["id", "nested"], -); - -let mut inner = GenericRow::new(2); -inner.set_field(0, 42_i32); -inner.set_field(1, "hello"); - -let mut outer = GenericRow::new(2); -outer.set_field(0, 1_i32); -outer.set_field(1, Datum::Row(Box::new(inner))); - -// Write -let mut writer = CompactedRowWriter::new(2); -// ... (via FieldGetter + ValueWriter) - -// Read back via CompactedRowDeserializer and assert values match -``` - -Tests must pass: `cargo test -p fluss` - -## Key Discoveries - -- `RowType` derives `Debug, Clone, PartialEq, Eq, Hash, Serialize` — safe to store in `InnerValueWriter::Row(RowType)` (`datatype.rs:917`) -- `CompactedRow` delegates all `InternalRow` getters through `decoded_row()` → `&GenericRow` (`compacted_row.rs:71-74`). Adding `get_row` to `CompactedRow` follows this same delegation pattern. -- `CompactedRowReader::read_bytes` at line 275 returns `(&'a [u8], usize)` — the slice lifetime `'a` ties to the underlying data buffer, so nested rows can be deserialized with matching lifetimes. -- `GenericRow` currently derives only `Debug`. Its field `values: Vec>` means all the needed traits (`Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize`) are derivable because `Vec` derives them when `T` does and `Datum` already implements all of them. -- `parse_display::Display` on `Datum` uses `#[display("...")]` per variant — adding `#[display("ROW({0:?})")]` on `Datum::Row` satisfies the Display derive requirement. -- `ColumnarRow::get_row` would require extracting a `StructArray` sub-row — this is non-trivial and is explicitly excluded from this plan. - -## What We're NOT Doing - -- Array or Map type support (same pattern, separate task) -- `ColumnarRow::get_row` — requires Arrow `StructArray` extraction, separate sub-task -- `Datum::append_to` for `Row` (Arrow `StructBuilder`) — not needed for the serialization path -- Extending the Java-compat hex test (`test_all_data_types_java_compatible`) in `CompactedKeyEncoder` for ROW (needs Java reference hex data from `encoded_key.hex`) -- Performance optimization of `InnerValueWriter::Row` (currently creates `InnerValueWriter` per-field per-call; a cache can be added later) - -## Key Encoder: No Code Changes Needed - -`CompactedKeyEncoder` (`compacted_key_encoder.rs`) uses `FieldGetter::create` + `CompactedKeyWriter::create_value_writer` (which delegates to `ValueWriter::create_value_writer`). `CompactedKeyWriter` implements `BinaryWriter` by delegating all methods (including `write_bytes`) to `CompactedRowWriter`. Once FieldGetter and ValueWriter support Row (Phase 2), the key encoder works automatically for ROW primary key columns — no additional code changes needed. A test is included in Phase 3. - -## Implementation Approach - -Work bottom-up: type system first, then wire up write/read, then FieldGetter/ValueWriter, then tests. Each phase compiles cleanly before proceeding. - ---- - -## Phase 1: Core Type System - -### Overview -Add `Datum::Row` variant and derive required traits on `GenericRow`. Add `get_row()` to `InternalRow` and implement it on `GenericRow` and `CompactedRow`. Leave `ColumnarRow` as `unimplemented!()`. - -### Changes Required - -#### 1. `GenericRow` — add derives -**File**: `crates/fluss/src/row/mod.rs` - -Add `Clone, PartialEq, Eq, PartialOrd, Ord, Hash` to `GenericRow`'s derive. `Serialize` is also needed for `Datum::Row`. - -```rust -// Before: -#[derive(Debug)] -pub struct GenericRow<'a> { - pub values: Vec>, -} - -// After: -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)] -pub struct GenericRow<'a> { - pub values: Vec>, -} -``` - -Note: `Serialize` is required because `Datum` derives `Serialize` (serde). Adding `Datum::Row(Box>)` means the serde derive on `Datum` requires `GenericRow<'a>: Serialize`. The recursive type (`Datum` → `GenericRow` → `Vec`) is handled correctly by serde through `Box`. Add `use serde::Serialize;` to the imports in `mod.rs` (serde is already a dependency — `datum.rs` imports it at line 33). - -#### 2. `Datum::Row` variant -**File**: `crates/fluss/src/row/datum.rs` - -```rust -// In the Datum enum, after TimestampLtz: -#[display("ROW({0:?})")] -Row(Box>), -``` - -Also add `use crate::row::GenericRow;` at the top of datum.rs (check if it's already imported via `mod.rs` re-exports). - -The `is_null()` method already uses `matches!(self, Datum::Null)` so no change needed there. - -Add an accessor on `Datum`: -```rust -pub fn as_row(&self) -> &GenericRow<'_> { - match self { - Self::Row(r) => r.as_ref(), - _ => panic!("not a row: {self:?}"), - } -} -``` - -#### 3. `InternalRow::get_row` — trait method -**File**: `crates/fluss/src/row/mod.rs` - -Add to the `InternalRow` trait after `get_bytes`: -```rust -/// Returns the nested row value at the given position. -fn get_row(&self, pos: usize) -> Result>; -``` - -#### 4. `GenericRow::get_row` implementation -**File**: `crates/fluss/src/row/mod.rs` - -Add to `impl<'a> InternalRow for GenericRow<'a>`: -```rust -fn get_row(&self, pos: usize) -> Result> { - match self.get_value(pos)? { - Datum::Row(r) => Ok(*r.clone()), - other => Err(IllegalArgument { - message: format!( - "type mismatch at position {pos}: expected Row, got {other:?}" - ), - }), - } -} -``` - -Note: This clones the inner `Box` — `*r.clone()` dereferences the cloned Box to get an owned `GenericRow`. Acceptable for correctness; optimize later if needed. - -#### 5. `CompactedRow::get_row` implementation -**File**: `crates/fluss/src/row/compacted/compacted_row.rs` - -Following the same delegation pattern as all other getters: -```rust -fn get_row(&self, pos: usize) -> Result> { - self.decoded_row().get_row(pos) -} -``` - -#### 6. `ColumnarRow::get_row` — stub -**File**: `crates/fluss/src/row/column.rs` - -```rust -fn get_row(&self, pos: usize) -> Result> { - unimplemented!("ColumnarRow::get_row is not yet implemented — requires Arrow StructArray extraction") -} -``` - -### Success Criteria - -#### Automated Verification: -- [x] `cargo build -p fluss` compiles with no errors -- [x] `cargo test -p fluss` — all existing tests pass (250 passed) - ---- - -## Phase 2: Serialization Stack - -### Overview -Wire up the write path (`CompactedRowWriter` + `InnerValueWriter`) and the read path (`CompactedRowDeserializer`), then hook up `FieldGetter` and `InnerValueWriter` to expose Row to the encoder pipeline. - -### Changes Required - -#### 1. `InnerValueWriter::Row` variant -**File**: `crates/fluss/src/row/binary/binary_writer.rs` - -```rust -// In InnerValueWriter enum, replacing the TODO comment: -Row(RowType), // schema needed to iterate fields during write -``` - -Remove or update the `// TODO Array, Row` comment. - -#### 2. `InnerValueWriter::create_inner_value_writer` — Row arm -**File**: `crates/fluss/src/row/binary/binary_writer.rs` - -```rust -// In the match, before the _ => unimplemented! arm: -DataType::Row(row_type) => Ok(InnerValueWriter::Row(row_type.clone())), -``` - -Add `use crate::metadata::RowType;` if not already imported. - -#### 3. `InnerValueWriter::write_value` — Row arm -**File**: `crates/fluss/src/row/binary/binary_writer.rs` - -`write_value` is generic `W: BinaryWriter`. The `BinaryWriter` trait has `write_bytes`, so we create a nested `CompactedRowWriter` (always, regardless of outer writer type), serialize the inner row into it, then call `writer.write_bytes(nested.buffer())`. This mirrors Java's `writeRow`: `RowSerializer.toBinaryRow(value)` produces compacted-format bytes, then `write(length, segments, offset)` = `writeInt(len)` + raw bytes = `writeBytes`. - -```rust -// In write_value, before the _ => Err(...) arm: -(InnerValueWriter::Row(row_type), Datum::Row(inner_row)) => { - use crate::row::compacted::compacted_row_writer::CompactedRowWriter; - let field_count = row_type.fields().len(); - let mut nested = CompactedRowWriter::new(field_count); - for (i, field) in row_type.fields().iter().enumerate() { - let datum = &inner_row.values[i]; - if datum.is_null() { - if field.data_type.is_nullable() { - nested.set_null_at(i); - } - } else { - let vw = InnerValueWriter::create_inner_value_writer(&field.data_type, None) - .expect("create_inner_value_writer failed for nested row field"); - vw.write_value(&mut nested, i, datum) - .expect("write_value failed for nested row field"); - } - } - writer.write_bytes(nested.buffer()); -} -``` - -Note: `nested.buffer()` = `&self.buffer[..self.position]` = the full compacted row (null-bit header + field bytes). `BinaryWriter::write_bytes` prepends a varint length — this is the complete wire encoding. - -#### 4. `CompactedRowDeserializer::deserialize` — Row arm -**File**: `crates/fluss/src/row/compacted/compacted_row_reader.rs` - -Mirrors Java's `readRow`: `readInt()` (varint length) → slice the buffer → create `CompactedRow.pointTo(...)`. The Rust equivalent uses eager decode instead of lazy `pointTo`. Add before the `_ => panic!(...)` arm: - -```rust -DataType::Row(row_type) => { - // read_bytes returns (&'a [u8], next_cursor) — varint len + raw slice - let (nested_bytes, next) = reader.read_bytes(cursor); - let nested_reader = CompactedRowReader::new( - row_type.fields().len(), - nested_bytes, - 0, - nested_bytes.len(), - ); - // new_from_owned avoids borrowing the local row_type reference across the loop - let nested_deser = CompactedRowDeserializer::new_from_owned(row_type.clone()); - let nested_row = nested_deser.deserialize(&nested_reader); - (Datum::Row(Box::new(nested_row)), next) -} -``` - -Lifetime note: `nested_bytes: &'a [u8]` borrows from the outer buffer (same lifetime `'a`). The returned `GenericRow<'a>` may contain `Cow::Borrowed` string/bytes datums that also borrow from `'a`. The local `nested_deser` is dropped after the block; this is fine because `GenericRow` borrows only from the reader's data buffer, not from the deserializer. - -#### 5. `FieldGetter::create` — Row arm -**File**: `crates/fluss/src/row/field_getter.rs` - -```rust -// In FieldGetter::create match, before the _ => unimplemented!(...) arm: -DataType::Row(_) => InnerFieldGetter::Row { pos }, -``` - -#### 6. `InnerFieldGetter` — Row variant -**File**: `crates/fluss/src/row/field_getter.rs` - -```rust -// In InnerFieldGetter enum, after TimestampLtz: -Row { - pos: usize, -}, -``` - -#### 7. `InnerFieldGetter::get_field` — Row arm -**File**: `crates/fluss/src/row/field_getter.rs` - -```rust -// In get_field match, replacing the //TODO Array, Map, Row comment: -InnerFieldGetter::Row { pos } => { - Datum::Row(Box::new(row.get_row(*pos)?)) -} -``` - -Remove or update the `//TODO Array, Map, Row` comment. - -#### 8. `InnerFieldGetter::pos` — Row arm -**File**: `crates/fluss/src/row/field_getter.rs` - -Add `Row { pos }` to the `pos()` method's catch-all pattern: - -```rust -// In pos() match: -| Self::TimestampLtz { pos, .. } -| Self::Row { pos } => *pos, -``` - -### Success Criteria - -#### Automated Verification: -- [x] `cargo build -p fluss` compiles with no errors -- [x] `cargo test -p fluss` — all existing tests pass - ---- - -## Phase 3: Tests - -### Overview -Add unit tests covering simple nesting, deep nesting, and nullable nested fields. All tests exercise the full round-trip: `GenericRow` → `CompactedRowWriter` (via `ValueWriter`/`FieldGetter`) → bytes → `CompactedRowDeserializer` → `GenericRow`. - -### Changes Required - -#### 1. Round-trip tests for ROW type -**File**: `crates/fluss/src/row/compacted/compacted_row_reader.rs` (in the existing `#[cfg(test)]` module, or add a new test module at the bottom) - -```rust -#[cfg(test)] -mod row_type_tests { - use crate::metadata::{DataTypes, DataType, RowType}; - use crate::row::{Datum, GenericRow}; - use crate::row::binary::{BinaryWriter, ValueWriter}; - use crate::row::compacted::compacted_row_writer::CompactedRowWriter; - use crate::row::compacted::compacted_row_reader::{ - CompactedRowDeserializer, CompactedRowReader, - }; - use crate::row::field_getter::FieldGetter; - - fn round_trip(outer_row_type: &RowType, outer_row: &GenericRow) -> GenericRow { - // Write - let field_getters = FieldGetter::create_field_getters(outer_row_type); - let value_writers: Vec = outer_row_type - .fields() - .iter() - .map(|f| ValueWriter::create_value_writer(f.data_type(), None).unwrap()) - .collect(); - let mut writer = CompactedRowWriter::new(outer_row_type.fields().len()); - for (i, (getter, vw)) in field_getters.iter().zip(value_writers.iter()).enumerate() { - let datum = getter.get_field(outer_row as &dyn crate::row::InternalRow).unwrap(); - vw.write_value(&mut writer, i, &datum).unwrap(); - } - let bytes = writer.to_bytes(); - - // Read - let deser = CompactedRowDeserializer::new(outer_row_type); - let reader = CompactedRowReader::new( - outer_row_type.fields().len(), - bytes.as_ref(), - 0, - bytes.len(), - ); - deser.deserialize(&reader) - } - - #[test] - fn test_row_simple_nesting() { - // ROW nested inside an outer row - let inner_row_type = RowType::with_data_types_and_field_names( - vec![DataTypes::int(), DataTypes::string()], - vec!["x", "label"], - ); - let outer_row_type = RowType::with_data_types_and_field_names( - vec![DataTypes::int(), DataType::Row(inner_row_type.clone())], - vec!["id", "nested"], - ); - - let mut inner = GenericRow::new(2); - inner.set_field(0, 42_i32); - inner.set_field(1, "hello"); - - let mut outer = GenericRow::new(2); - outer.set_field(0, 1_i32); - outer.set_field(1, Datum::Row(Box::new(inner))); - - let result = round_trip(&outer_row_type, &outer); - - assert_eq!(result.get_int(0).unwrap(), 1); - let nested = result.get_row(1).unwrap(); - assert_eq!(nested.get_int(0).unwrap(), 42); - assert_eq!(nested.get_string(1).unwrap(), "hello"); - } - - #[test] - fn test_row_deep_nesting() { - // ROW> — two levels of nesting - let inner_inner_row_type = RowType::with_data_types_and_field_names( - vec![DataTypes::int()], - vec!["n"], - ); - let inner_row_type = RowType::with_data_types_and_field_names( - vec![DataType::Row(inner_inner_row_type.clone())], - vec!["inner"], - ); - let outer_row_type = RowType::with_data_types_and_field_names( - vec![DataType::Row(inner_row_type.clone())], - vec!["outer"], - ); - - let mut innermost = GenericRow::new(1); - innermost.set_field(0, 99_i32); - - let mut middle = GenericRow::new(1); - middle.set_field(0, Datum::Row(Box::new(innermost))); - - let mut outer = GenericRow::new(1); - outer.set_field(0, Datum::Row(Box::new(middle))); - - let result = round_trip(&outer_row_type, &outer); - - let mid = result.get_row(0).unwrap(); - let inner = mid.get_row(0).unwrap(); - assert_eq!(inner.get_int(0).unwrap(), 99); - } - - #[test] - fn test_row_with_nullable_fields() { - // Outer nullable ROW column; nested row with a nullable STRING field set to null - // DataTypes::string() and RowType::with_data_types_and_field_names both default to nullable=true - let inner_row_type = RowType::with_data_types_and_field_names( - vec![DataTypes::int(), DataTypes::string()], - vec!["id", "optional_name"], - ); - let outer_row_type = RowType::with_data_types_and_field_names( - vec![DataTypes::int(), DataType::Row(inner_row_type.clone())], - vec!["k", "nested"], - ); - - // Case 1: non-null nested row with a null field inside - let mut inner = GenericRow::new(2); - inner.set_field(0, 7_i32); - inner.set_field(1, Datum::Null); - - let mut outer = GenericRow::new(2); - outer.set_field(0, 10_i32); - outer.set_field(1, Datum::Row(Box::new(inner))); - - let result = round_trip(&outer_row_type, &outer); - assert_eq!(result.get_int(0).unwrap(), 10); - let nested = result.get_row(1).unwrap(); - assert_eq!(nested.get_int(0).unwrap(), 7); - assert!(nested.is_null_at(1).unwrap()); - - // Case 2: outer ROW column is null - let mut outer_null = GenericRow::new(2); - outer_null.set_field(0, 20_i32); - outer_null.set_field(1, Datum::Null); - - let result2 = round_trip(&outer_row_type, &outer_null); - assert_eq!(result2.get_int(0).unwrap(), 20); - assert!(result2.is_null_at(1).unwrap()); - } -} -``` - -Note: `DataTypes::string()` creates `StringType::with_nullable(true)` — nullable by default. `RowType::with_data_types_and_field_names(...)` creates `RowType::with_nullable(true, fields)` — also nullable by default. To make non-nullable types, use the inner type constructors: `StringType::with_nullable(false)`, `RowType::with_nullable(false, fields)`. The tests above use the default nullable types, which is correct for testing null handling. - -#### 2. Key encoder test for ROW -**File**: `crates/fluss/src/row/encode/compacted_key_encoder.rs` (add to the existing `#[cfg(test)] mod tests`) - -This verifies that `CompactedKeyEncoder` works with ROW as a primary key column. No code changes to the key encoder itself — this exercises the FieldGetter + ValueWriter Row support through the key encoder pipeline. - -```rust -#[test] -fn test_row_as_primary_key() { - // ROW as a primary key column - let inner_row_type = RowType::with_data_types_and_field_names( - vec![DataTypes::int(), DataTypes::string()], - vec!["x", "label"], - ); - let row_type = RowType::with_data_types_and_field_names( - vec![ - DataTypes::int(), - DataType::Row(inner_row_type.clone()), - ], - vec!["id", "nested"], - ); - - let mut inner = GenericRow::new(2); - inner.set_field(0, 42_i32); - inner.set_field(1, "hello"); - - let mut row = GenericRow::new(2); - row.set_field(0, 1_i32); - row.set_field(1, Datum::Row(Box::new(inner))); - - let mut encoder = for_test_row_type(&row_type); - let encoded = encoder.encode_key(&row).unwrap(); - - // Verify it encodes without error and produces non-empty bytes - assert!(!encoded.is_empty()); - - // Encode the same row again to verify determinism - let encoded2 = encoder.encode_key(&row).unwrap(); - assert_eq!(encoded, encoded2); - - // Encode a different nested row and verify different output - let mut inner2 = GenericRow::new(2); - inner2.set_field(0, 99_i32); - inner2.set_field(1, "world"); - - let mut row2 = GenericRow::new(2); - row2.set_field(0, 1_i32); - row2.set_field(1, Datum::Row(Box::new(inner2))); - - let encoded3 = encoder.encode_key(&row2).unwrap(); - assert_ne!(encoded, encoded3); -} -``` - -### Success Criteria - -#### Automated Verification: -- [x] `cargo test -p fluss` — all tests pass including the new row tests -- [x] `cargo test -p fluss row_type_tests` — row round-trip tests pass -- [x] `cargo test -p fluss test_row_as_primary_key` — key encoder test passes -- [x] `cargo clippy -p fluss -- -D warnings` — no new warnings - -#### Manual Verification: -- [ ] Run tests and confirm the nested row values round-trip correctly at each nesting level -- [ ] Confirm that the `DataType::Row` arm in `CompactedRowDeserializer` no longer panics - ---- - -## Testing Strategy - -### Unit Tests -- Simple `ROW`: single level, non-null fields -- Deep `ROW>`: two levels, verifies recursion in both write and read -- Nullable outer ROW: verifies null-bit handling for the column-level nullable flag -- Nullable inner field: nested row has a nullable field set to Datum::Null -- ROW as primary key: verifies the key encoder pipeline works with Row via FieldGetter + ValueWriter - -### Not Tested Here (separate tasks) -- `ROW, MAP>` — blocked on Array/Map support -- `ColumnarRow::get_row` — blocked on StructArray extraction -- Java-compat hex test for ROW in `test_all_data_types_java_compatible` — needs Java reference hex data - ---- - -## Migration Notes - -No migration needed — this is additive. No existing wire-format bytes change. - ---- - -## References - -- Research document: `thoughts/shared/research/2026-03-15-row-nested-struct-support.md` -- Java reference — write: `CompactedRowWriter.java:339-346` -- Java reference — read: `CompactedRowReader.java:370-378` -- Wire format template: `compacted_row_writer.rs:115-130` (`write_bytes`/`write_string`) -- Read template: `compacted_row_reader.rs:275-287` (`read_bytes`/`read_string`) diff --git a/thoughts/shared/pr_description.md b/thoughts/shared/pr_description.md deleted file mode 100644 index 212e3505..00000000 --- a/thoughts/shared/pr_description.md +++ /dev/null @@ -1,31 +0,0 @@ -# PR Description Template - -Use this template to fill out PR descriptions. - -## What problem does this PR solve? - - - -## What changes were made? - - - -## Why this approach? - - - -## Breaking changes / migration notes - - - -## Changelog entry - - - -## How to verify it - - - -- [ ] `cargo test -p fluss-rs` — all tests pass -- [ ] `cargo clippy -p fluss-rs -- -D warnings` — no new warnings -- [ ] Manual: review the diff for correctness against the linked plan/research doc diff --git a/thoughts/shared/research/2026-03-15-row-nested-struct-support.md b/thoughts/shared/research/2026-03-15-row-nested-struct-support.md deleted file mode 100644 index 199cda6c..00000000 --- a/thoughts/shared/research/2026-03-15-row-nested-struct-support.md +++ /dev/null @@ -1,337 +0,0 @@ ---- -date: 2026-03-15T11:29:18Z -researcher: hemanth -git_commit: 7d4bfd663be7d3edf527ffcba56d5c370c67cf20 -branch: main -repository: hemanthsavasere/fluss-rust -topic: "ROW (nested struct) column end-to-end support" -tags: [research, codebase, row-type, datum, compacted-row, internal-row, field-getter, value-writer, key-encoder] -status: complete -last_updated: 2026-03-15 -last_updated_by: hemanth ---- - -# Research: ROW (Nested Struct) Column End-to-End Support - -**Date**: 2026-03-15T11:29:18Z -**Researcher**: hemanth -**Git Commit**: [7d4bfd6](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20) -**Branch**: main -**Repository**: hemanthsavasere/fluss-rust - ---- - -## Research Question - -Add end-to-end support for ROW (nested struct) columns. A nested Row is just a recursively-embedded CompactedRow — the same format as a top-level row, length-prefixed. The `Datum` enum needs a `Row` variant, `InternalRow` needs `get_row()`. Java references: `CompactedRowWriter.java` lines 339–346 and `CompactedRowReader.java` lines 370–378 — it's serialize-to-bytes then `write_bytes`, same as strings. The value writer, field getter, and key encoder need the same additions as the other two types. - ---- - -## Summary - -`DataType::Row(RowType)` is already fully modeled at the schema level (type definition, JSON serde, Arrow type conversion). **Nothing in that layer needs changing.** What is missing is the entire row-serialization stack: `Datum` has no `Row` variant, `InternalRow` has no `get_row()`, `CompactedRowWriter`/`CompactedRowDeserializer` panic on `DataType::Row`, and `FieldGetter`/`ValueWriter` hit `unimplemented!()`. - -The implementation pattern is the same as `String`/`Bytes` in the writer: serialize the nested row to bytes with a recursive `CompactedRowWriter`, then store with `write_bytes()` (varint length prefix + raw bytes). On the read side, `read_bytes()` returns the raw slice, which is recursively wrapped in a `CompactedRow`. No new binary format is needed. - -Array and Map are in the exact same situation — none of the serialization layers implement them either. All three complex types share the same set of `TODO` comments, `unimplemented!()` guards, and `_ => panic!` arms. - ---- - -## Detailed Findings - -### What Is Already Implemented (No Changes Needed) - -| Component | File | Status | -|---|---|---| -| `DataType::Row(RowType)` enum variant | [`metadata/datatype.rs:26-46`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/metadata/datatype.rs#L26-L46) | ✅ Fully defined | -| `RowType` struct with `fields: Vec` | [`metadata/datatype.rs:918`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/metadata/datatype.rs#L918) | ✅ Fully defined | -| `is_nullable` / `as_non_nullable` / `Display` for Row | [`metadata/datatype.rs:49-119`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/metadata/datatype.rs#L49-L119) | ✅ Dispatched | -| JSON serde for `Row` | [`metadata/json_serde.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/metadata/json_serde.rs) | ✅ Serialize + deserialize | -| Arrow type conversion (`Row` → `ArrowDataType::Struct`) | [`record/arrow.rs:1054-1076`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/record/arrow.rs#L1054-L1076) | ✅ Implemented | - -### What Needs to Be Implemented - -#### 1. `Datum` Enum — `row/datum.rs:39-71` - -**Current state**: 15 variants covering all scalar types. No `Array`, `Map`, or `Row` variant. The TODO is implicit — the comment `//TODO Array, Map, Row` appears in `field_getter.rs:180` and `binary_writer.rs:139`. - -```rust -// Current variants (datum.rs:39-71): -pub enum Datum<'a> { - Null, Bool(bool), Int8(i8), Int16(i16), Int32(i32), Int64(i64), - Float32(F32), Float64(F64), - String(Str<'a>), // Cow<'a, str> — zero-copy borrow from buffer - Blob(Blob<'a>), // Cow<'a, [u8]> — zero-copy borrow from buffer - Decimal(Decimal), Date(Date), Time(Time), - TimestampNtz(TimestampNtz), TimestampLtz(TimestampLtz), - // No Row variant yet -} -``` - -Need to add a `Row` variant carrying a nested row. The `String`/`Blob` variants use `Cow<'a, …>` for zero-copy borrowing — the Row variant will need a similar lifetime or boxing strategy. - -#### 2. `InternalRow` Trait — `row/mod.rs:61-126` - -**Current state**: 18 typed `get_*` methods (get_boolean through get_bytes). No `get_row()`. - -```rust -// Current trait (mod.rs:61-126) — abridged: -pub trait InternalRow: Send + Sync { - fn get_field_count(&self) -> usize; - fn is_null_at(&self, pos: usize) -> Result; - fn get_boolean(&self, pos: usize) -> Result; - // ... 15 more typed getters ... - fn get_bytes(&self, pos: usize) -> Result<&[u8]>; - // No get_row yet -} -``` - -Need to add: `fn get_row(&self, pos: usize) -> Result<...>` with an appropriate return type (boxed trait object or concrete type). Must be implemented on `GenericRow`, `CompactedRow`, and `ColumnarRow`. - -#### 3. `CompactedRowWriter` + `BinaryWriter` Trait - -**Current state**: `BinaryWriter` trait has TODO stubs for `write_array` and `write_row` commented out at [`binary_writer.rs:70-74`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/binary/binary_writer.rs#L70-L74). `CompactedRowWriter` has no `write_row` method. - -The reference pattern in the same writer is `write_bytes` / `write_string` ([`compacted_row_writer.rs:115-130`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/compacted/compacted_row_writer.rs#L115-L130)): - -```rust -// write_bytes: varint(len) + raw bytes -fn write_bytes(&mut self, value: &[u8]) { - self.write_int(value.len() as i32); - self.write_raw(value); -} - -// write_string: delegates to write_bytes -fn write_string(&mut self, value: &str) { - self.write_bytes(value.as_ref()); -} -``` - -Java's `CompactedRowWriter.java:339-346` does the same for nested rows: serialize the nested row to bytes, then call `writeBytes`. The Rust implementation should: - -```rust -fn write_row(&mut self, value: &dyn InternalRow, row_type: &RowType) { - let mut nested_writer = CompactedRowWriter::new(row_type.fields().len()); - // ... encode each field of value into nested_writer ... - let bytes = nested_writer.to_bytes(); - self.write_bytes(&bytes); -} -``` - -#### 4. `CompactedRowDeserializer` — `row/compacted/compacted_row_reader.rs:52-171` - -**Current state**: The `deserialize` method's `DataType` match falls through to `panic!` for all unsupported types at [`compacted_row_reader.rs:163-165`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/compacted/compacted_row_reader.rs#L163-L165): - -```rust -_ => { - panic!("Unsupported DataType in CompactedRowDeserializer: {dtype:?}"); -} -``` - -The reference pattern is `read_bytes` / `read_string` ([`compacted_row_reader.rs:275-287`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/compacted/compacted_row_reader.rs#L275-L287)): - -```rust -// read_bytes: returns (&[u8], next_cursor) -pub fn read_bytes(&self, pos: usize) -> (&[u8], usize) { - let (len, data_pos) = self.read_int(pos); - (&self.segment[data_pos..data_pos + len as usize], data_pos + len as usize) -} - -pub fn read_string(&self, pos: usize) -> (&str, usize) { - let (bytes, next) = self.read_bytes(pos); - (std::str::from_utf8(bytes).unwrap(), next) -} -``` - -Java's `CompactedRowReader.java:370-378` does the same for nested rows: call `readBytes`, then deserialize the result. The Rust implementation should: - -```rust -DataType::Row(row_type) => { - let (nested_bytes, next_cursor) = reader.read_bytes(cursor); - let nested_row = CompactedRow::from_bytes(row_type, nested_bytes); - row.set_field(i, Datum::Row(Box::new(nested_row))); - cursor = next_cursor; -} -``` - -#### 5. `FieldGetter` + `InnerFieldGetter` — `row/field_getter.rs` - -**Current state**: `InnerFieldGetter` has 16 variants ([`field_getter.rs:97-152`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/field_getter.rs#L97-L152)). `create()` hits `unimplemented!()` for unknown types ([`field_getter.rs:85`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/field_getter.rs#L85)). `get_field` has `//TODO Array, Map, Row` at [`field_getter.rs:180`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/field_getter.rs#L180). - -All existing variants follow the same pattern: -```rust -// Example: InnerFieldGetter::String { pos } -InnerFieldGetter::String { pos } => { - Datum::String(Str::from(row.get_string(*pos)?)) -} -``` - -Need to add: -```rust -// In InnerFieldGetter enum: -Row { pos }, // (RowType not needed here since get_row returns the whole row) - -// In create(): -DataType::Row(_) => InnerFieldGetter::Row { pos }, - -// In get_field(): -InnerFieldGetter::Row { pos } => { - Datum::Row(row.get_row(*pos)?) -} -``` - -#### 6. `InnerValueWriter` + `create_inner_value_writer` + `write_value` — `row/binary/binary_writer.rs` - -**Current state**: `InnerValueWriter` has 16 variants ([`binary_writer.rs:122-140`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/binary/binary_writer.rs#L122-L140)) with `// TODO Array, Row` at line 139. `create_inner_value_writer` hits `unimplemented!()` at [`binary_writer.rs:178`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/binary/binary_writer.rs#L178). `write_value` hits `Err(IllegalArgument)` for mismatched pairs. - -The existing variant pattern (Decimal stores type params): -```rust -// In InnerValueWriter enum: -Decimal(u32, u32), // precision, scale -TimestampNtz(u32), // precision -``` - -For Row, the `RowType` is needed to recursively encode fields: -```rust -// In InnerValueWriter enum: -Row(RowType), // schema of the nested row - -// In create_inner_value_writer(): -DataType::Row(row_type) => InnerValueWriter::Row(row_type.clone()), - -// In write_value(): -(InnerValueWriter::Row(row_type), Datum::Row(inner_row)) => { - writer.write_row(inner_row.as_ref(), row_type)?; -} -``` - -#### 7. `CompactedKeyEncoder` — `row/encode/compacted_key_encoder.rs:266-268` - -**Current state**: The Java-compatibility test has explicit TODO comments: -```rust -// TODO: Add support for ARRAY type -// TODO: Add support for MAP type -// TODO: Add support for ROW type -``` - -ROW as a primary key column would flow through `FieldGetter` and `ValueWriter` just like any other type. Once steps 5 and 6 above are done, `CompactedKeyEncoder` should work for Row automatically (since it constructs its `FieldGetter`s and `ValueWriter`s by calling `FieldGetter::create` and `ValueWriter::create_value_writer`). The test just needs the TODO comments replaced with actual test cases. - ---- - -## Wire Format Documentation - -The nested-row wire format follows the same length-prefixed bytes pattern as `String` and `Bytes`: - -``` -[ varint(nested_row_byte_length) ][ nested_row_bytes ... ] -``` - -Where `nested_row_bytes` is a complete compacted row: -``` -[ null-bit-header: ceil(field_count/8) bytes ][ field_0 ][ field_1 ] ... [ field_N ] -``` - -This is recursive — a `Row>` stores: -``` -varint(outer_len) [ - null-bits(1 byte) - varint(inner_len) [ - null-bits(1 byte) - varint(int_value) - ] -] -``` - -No new binary primitives are needed. The existing `write_bytes`/`read_bytes` plus recursive `CompactedRowWriter`/`CompactedRowDeserializer` cover everything. - ---- - -## Code References - -| File | Line(s) | Description | -|---|---|---| -| [`row/datum.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/datum.rs#L39-L71) | 39–71 | `Datum` enum — add `Row` variant here | -| [`row/mod.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/mod.rs#L61-L126) | 61–126 | `InternalRow` trait — add `get_row()` here | -| [`row/compacted/compacted_row_writer.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/compacted/compacted_row_writer.rs#L115-L130) | 115–130 | `write_bytes`/`write_string` — reference pattern for `write_row` | -| [`row/compacted/compacted_row_reader.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/compacted/compacted_row_reader.rs#L163-L165) | 163–165 | `panic!` arm in deserializer — replace with `DataType::Row` arm | -| [`row/compacted/compacted_row_reader.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/compacted/compacted_row_reader.rs#L275-L287) | 275–287 | `read_bytes`/`read_string` — reference pattern for reading nested rows | -| [`row/field_getter.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/field_getter.rs#L85) | 85 | `unimplemented!()` in `create()` — add `DataType::Row` arm | -| [`row/field_getter.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/field_getter.rs#L97-L180) | 97–180 | `InnerFieldGetter` enum + `get_field` dispatch — add `Row` variant | -| [`row/binary/binary_writer.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/binary/binary_writer.rs#L70-L74) | 70–74 | `BinaryWriter` trait — uncomment/add `write_row` method | -| [`row/binary/binary_writer.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/binary/binary_writer.rs#L122-L140) | 122–140 | `InnerValueWriter` enum — add `Row(RowType)` variant | -| [`row/binary/binary_writer.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/binary/binary_writer.rs#L145-L181) | 145–181 | `create_inner_value_writer` — add `DataType::Row` arm | -| [`row/binary/binary_writer.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/binary/binary_writer.rs#L184-L247) | 184–247 | `write_value` dispatch — add `(Row(t), Datum::Row(r))` arm | -| [`row/encode/compacted_key_encoder.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/row/encode/compacted_key_encoder.rs#L266-L268) | 266–268 | TODO comments for Array/Map/Row in Java-compat test | -| [`metadata/datatype.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/metadata/datatype.rs#L918) | 918 | `RowType` struct — already fully defined, no changes needed | -| [`record/arrow.rs`](https://github.com/hemanthsavasere/fluss-rust/blob/7d4bfd663be7d3edf527ffcba56d5c370c67cf20/crates/fluss/src/record/arrow.rs#L1054-L1076) | 1054–1076 | Arrow `Struct` conversion for Row — already implemented | - ---- - -## Architecture Documentation - -### Existing Patterns That ROW Must Follow - -**String/Bytes in `CompactedRowWriter`** (the write-side template): -1. `write_bytes(value)` = `write_int(len)` + `write_raw(bytes)` (`compacted_row_writer.rs:115-120`) -2. `write_string(value)` = `write_bytes(value.as_ref())` (`compacted_row_writer.rs:128-130`) -3. ROW follows: `write_row(row, type)` = serialize row → bytes, then `write_bytes(bytes)` - -**String/Bytes in `CompactedRowDeserializer`** (the read-side template): -1. `read_bytes(pos)` = `read_int(pos)` → len, return `(segment[pos..pos+len], pos+len)` (`compacted_row_reader.rs:275-281`) -2. `read_string(pos)` = `read_bytes(pos)` + `from_utf8` (`compacted_row_reader.rs:283-287`) -3. ROW follows: `read_bytes(pos)` → raw slice, then `CompactedRow::from_bytes(row_type, slice)` - -**`Decimal(precision, scale)` in `InnerValueWriter`** (template for type params): -- The variant carries type parameters needed at write time -- `InnerValueWriter::Row(RowType)` follows the same pattern — RowType carries the field schema - -**`TimestampNtz(precision)` / `TimestampLtz(precision)` in `InnerFieldGetter`** (template for type params in FieldGetter): -- The variant carries read parameters needed at get time -- `InnerFieldGetter::Row { pos }` is simpler — no extra params needed beyond `pos` since `get_row` returns the whole row - -### Data Flow for ROW Fields (once implemented) - -**Write path**: -``` -GenericRow { field: Datum::Row(inner_row) } - → FieldGetter::Row { pos }.get_field(outer_row) - → Datum::Row(inner_row) - → InnerValueWriter::Row(row_type).write_value(writer, pos, datum) - → CompactedRowWriter::write_row(inner_row, row_type) - → nested CompactedRowWriter serializes inner_row - → write_bytes(nested_bytes) // varint(len) + raw -``` - -**Read path**: -``` -raw bytes from wire - → CompactedRowDeserializer: DataType::Row(row_type) arm - → reader.read_bytes(cursor) // read varint(len) + raw slice - → CompactedRow::from_bytes(row_type, slice) - → store as Datum::Row(nested_compacted_row) - → cached in GenericRow via OnceLock - → outer row.get_row(pos) returns the nested CompactedRow -``` - ---- - -## Planned Tests - -Per the original task description: - -1. **Simple nested row** — `ROW`, single level of nesting -2. **Deeply nested** — `ROW>`, recursive nesting -3. **Nullable fields** — nested row containing nullable fields; outer nullable ROW column -4. **Combination** (separate issue) — `ROW, MAP>` once Array and Map also land - ---- - -## Open Questions - -1. **Return type of `get_row`** — Should it return `Box`, `&dyn InternalRow`, or a concrete type like `CompactedRow`? The lifetime constraints from `GenericRow<'a>` (which stores `Datum<'a>`) may require a lifetime on the return type. - -2. **`Datum::Row` inner type** — `Box>` works but loses the zero-copy property that `Cow<'a, …>` provides for String/Blob. Alternatively, storing the raw bytes in `Datum::Row(Blob<'a>)` and lazily deserializing preserves zero-copy but complicates access patterns. - -3. **`write_row` signature in `BinaryWriter` trait** — Needs `&RowType` to know how many fields and their types for serializing. Whether the encoder carries the `RowType` in `InnerValueWriter::Row(RowType)` or passes it at call time affects the API. - -4. **`ColumnarRow::get_row`** — Extracting a struct sub-row from an Arrow `StructArray` is more complex than the other implementations. This may need its own sub-task. From c5ca7c35f4a55ecb47f6ec7d59b242e5fb3efc30 Mon Sep 17 00:00:00 2001 From: hemanthsavasere Date: Sun, 15 Mar 2026 18:50:05 +0000 Subject: [PATCH 3/3] chore: restore .gitignore to match main Co-Authored-By: Claude Sonnet 4.6 --- .gitignore | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index f16dcbaa..5d11a1c0 100644 --- a/.gitignore +++ b/.gitignore @@ -53,6 +53,4 @@ website/package-lock.json website/versioned_docs website/versioned_sidebars website/versions.json -website/pnpm-lock.yaml - -thoughts/ \ No newline at end of file +website/pnpm-lock.yaml \ No newline at end of file