diff --git a/bindings/cpp/src/types.rs b/bindings/cpp/src/types.rs index f8efe677..93aa5ade 100644 --- a/bindings/cpp/src/types.rs +++ b/bindings/cpp/src/types.rs @@ -351,6 +351,9 @@ pub fn resolve_row_types( Datum::Time(t) => Datum::Time(*t), Datum::TimestampNtz(ts) => Datum::TimestampNtz(*ts), Datum::TimestampLtz(ts) => Datum::TimestampLtz(*ts), + // TODO: C++ bindings need proper CXX wrapper types for FlussArray + // before C++ users can construct or inspect array values through FFI. + Datum::Array(a) => Datum::Array(a.clone()), }; out.set_field(idx, resolved); } @@ -408,6 +411,9 @@ pub fn compacted_row_to_owned( fcore::metadata::DataType::Binary(dt) => { Datum::Blob(Cow::Owned(row.get_binary(i, dt.length())?.to_vec())) } + // TODO: C++ bindings need proper CXX wrapper types for FlussArray + // before C++ users can construct or inspect array values through FFI. + fcore::metadata::DataType::Array(_) => Datum::Array(row.get_array(i)?), other => return Err(anyhow!("Unsupported data type for column {i}: {other:?}")), }; diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs index d8ba6d95..7dd745ba 100644 --- a/crates/fluss/src/record/arrow.rs +++ b/crates/fluss/src/record/arrow.rs @@ -1091,6 +1091,71 @@ pub fn to_arrow_type(fluss_type: &DataType) -> Result { }) } +/// Converts an Arrow data type back to a Fluss `DataType`. +/// Used for reading array elements from Arrow ListArray back into Fluss types. +pub(crate) fn from_arrow_type(arrow_type: &ArrowDataType) -> Result { + use crate::metadata::DataTypes; + + Ok(match arrow_type { + ArrowDataType::Boolean => DataTypes::boolean(), + ArrowDataType::Int8 => DataTypes::tinyint(), + ArrowDataType::Int16 => DataTypes::smallint(), + ArrowDataType::Int32 => DataTypes::int(), + ArrowDataType::Int64 => DataTypes::bigint(), + ArrowDataType::Float32 => DataTypes::float(), + ArrowDataType::Float64 => DataTypes::double(), + ArrowDataType::Utf8 => DataTypes::string(), + ArrowDataType::Binary => DataTypes::bytes(), + ArrowDataType::Date32 => DataTypes::date(), + ArrowDataType::FixedSizeBinary(len) => { + if *len < 0 { + return Err(Error::IllegalArgument { + message: format!("FixedSizeBinary length must be >= 0, got {len}"), + }); + } + DataTypes::binary(*len as usize) + } + ArrowDataType::Decimal128(p, s) => { + if *s < 0 { + return Err(Error::IllegalArgument { + message: format!("Decimal scale must be >= 0, got {s}"), + }); + } + DataTypes::decimal(*p as u32, *s as u32) + } + ArrowDataType::Time32(arrow_schema::TimeUnit::Second) => DataTypes::time_with_precision(0), + ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond) => { + DataTypes::time_with_precision(3) + } + ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => { + DataTypes::time_with_precision(6) + } + ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => { + DataTypes::time_with_precision(9) + } + ArrowDataType::Timestamp(unit, tz) => { + let precision = match unit { + arrow_schema::TimeUnit::Second => 0, + arrow_schema::TimeUnit::Millisecond => 3, + arrow_schema::TimeUnit::Microsecond => 6, + arrow_schema::TimeUnit::Nanosecond => 9, + }; + + if tz.is_some() { + DataTypes::timestamp_ltz_with_precision(precision) + } else { + DataTypes::timestamp_with_precision(precision) + } + } + ArrowDataType::List(field) => DataTypes::array(from_arrow_type(field.data_type())?), + other => { + return Err(Error::IllegalArgument { + message: format!("Cannot convert Arrow type to Fluss type: {other:?}"), + }); + } + }) +} + #[derive(Clone)] pub struct ReadContext { target_schema: SchemaRef, diff --git a/crates/fluss/src/row/binary/binary_writer.rs b/crates/fluss/src/row/binary/binary_writer.rs index af2765c4..f51a6e80 100644 --- a/crates/fluss/src/row/binary/binary_writer.rs +++ b/crates/fluss/src/row/binary/binary_writer.rs @@ -67,8 +67,7 @@ pub trait BinaryWriter { fn write_timestamp_ltz(&mut self, value: &crate::row::datum::TimestampLtz, precision: u32); - // TODO InternalArray, ArraySerializer - // fn write_array(&mut self, pos: i32, value: i64); + fn write_array(&mut self, value: &[u8]); // TODO Row serializer // fn write_row(&mut self, pos: i32, value: &InternalRow); @@ -136,7 +135,8 @@ pub enum InnerValueWriter { Time(u32), // precision (not used in wire format, but kept for consistency) TimestampNtz(u32), // precision TimestampLtz(u32), // precision - // TODO Array, Row + Array, + // TODO Row } /// Accessor for writing the fields/elements of a binary writer during runtime, the @@ -175,6 +175,7 @@ impl InnerValueWriter { // Validation is done at TimestampLTzType construction time Ok(InnerValueWriter::TimestampLtz(t.precision())) } + DataType::Array(_) => Ok(InnerValueWriter::Array), _ => unimplemented!( "ValueWriter for DataType {:?} is currently not implemented", data_type @@ -237,6 +238,9 @@ impl InnerValueWriter { (InnerValueWriter::TimestampLtz(p), Datum::TimestampLtz(ts)) => { writer.write_timestamp_ltz(ts, *p); } + (InnerValueWriter::Array, Datum::Array(arr)) => { + writer.write_array(arr.as_bytes()); + } _ => { return Err(IllegalArgument { message: format!("{self:?} used to write value {value:?}"), diff --git a/crates/fluss/src/row/binary_array.rs b/crates/fluss/src/row/binary_array.rs new file mode 100644 index 00000000..9008bc54 --- /dev/null +++ b/crates/fluss/src/row/binary_array.rs @@ -0,0 +1,848 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Binary array format matching Java's `BinaryArray.java` layout. +//! +//! Binary layout: +//! ```text +//! [size(4B)] + [null bits (4-byte word aligned)] + [fixed-length part] + [variable-length part] +//! ``` +//! +//! Java reference: `BinaryArray.java`, `BinaryArrayWriter.java` + +use crate::error::Error::IllegalArgument; +use crate::error::Result; +use crate::metadata::DataType; +use crate::row::Decimal; +use crate::row::datum::{Date, Time, TimestampLtz, TimestampNtz}; +use bytes::Bytes; +use serde::Serialize; +use std::fmt; +use std::hash::{Hash, Hasher}; + +const MAX_FIX_PART_DATA_SIZE: usize = 7; +const HIGHEST_FIRST_BIT: u64 = 0x80_u64 << 56; +const HIGHEST_SECOND_TO_EIGHTH_BIT: u64 = 0x7F_u64 << 56; + +/// Calculates the header size in bytes: 4 (for element count) + null bits (4-byte word aligned). +/// Matches Java's `BinaryArray.calculateHeaderInBytes(numFields)`. +pub fn calculate_header_in_bytes(num_elements: usize) -> usize { + 4 + num_elements.div_ceil(32) * 4 +} + +/// Calculates the fixed-length part size per element for a given data type. +/// Matches Java's `BinaryArray.calculateFixLengthPartSize(DataType)`. +pub fn calculate_fix_length_part_size(element_type: &DataType) -> usize { + match element_type { + DataType::Boolean(_) | DataType::TinyInt(_) => 1, + DataType::SmallInt(_) => 2, + DataType::Int(_) | DataType::Float(_) | DataType::Date(_) | DataType::Time(_) => 4, + DataType::BigInt(_) + | DataType::Double(_) + | DataType::Char(_) + | DataType::String(_) + | DataType::Binary(_) + | DataType::Bytes(_) + | DataType::Decimal(_) + | DataType::Timestamp(_) + | DataType::TimestampLTz(_) + | DataType::Array(_) + | DataType::Map(_) + | DataType::Row(_) => 8, + } +} + +/// Rounds a byte count up to the nearest 8-byte word boundary. +/// Matches Java's `roundNumberOfBytesToNearestWord`. +fn round_to_nearest_word(num_bytes: usize) -> usize { + (num_bytes + 7) & !7 +} + +/// A Fluss binary array, wire-compatible with Java's `BinaryArray`. +/// +/// Stores elements in a flat byte buffer with a header (element count + null bitmap) +/// followed by fixed-length slots and an optional variable-length section. +/// +/// Uses `Bytes` internally so cloning is O(1) reference-counted. +// TODO: FlussArray currently exposes only fallible getters. Infallible +// fast-path variants may be added later as non-breaking extensions. +#[derive(Clone)] +pub struct FlussArray { + data: Bytes, + size: usize, + element_offset: usize, +} + +impl fmt::Debug for FlussArray { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FlussArray") + .field("size", &self.size) + .field("data_len", &self.data.len()) + .finish() + } +} + +impl fmt::Display for FlussArray { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "FlussArray[size={}]", self.size) + } +} + +impl PartialEq for FlussArray { + fn eq(&self, other: &Self) -> bool { + self.data == other.data + } +} + +impl Eq for FlussArray {} + +impl PartialOrd for FlussArray { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for FlussArray { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.data.cmp(&other.data) + } +} + +impl Hash for FlussArray { + fn hash(&self, state: &mut H) { + self.data.hash(state); + } +} + +impl Serialize for FlussArray { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + serializer.serialize_bytes(&self.data) + } +} + +impl FlussArray { + /// Validates the raw bytes and computes derived fields (size, element_offset). + fn validate(data: &[u8]) -> Result<(usize, usize)> { + if data.len() < 4 { + return Err(IllegalArgument { + message: format!( + "FlussArray data too short: need at least 4 bytes, got {}", + data.len() + ), + }); + } + let raw_size = i32::from_le_bytes(data[0..4].try_into().unwrap()); + if raw_size < 0 { + return Err(IllegalArgument { + message: format!("FlussArray size must be non-negative, got {raw_size}"), + }); + } + let size = raw_size as usize; + let element_offset = calculate_header_in_bytes(size); + if element_offset > data.len() { + return Err(IllegalArgument { + message: format!( + "FlussArray header exceeds payload: header={}, payload={}", + element_offset, + data.len() + ), + }); + } + Ok((size, element_offset)) + } + + /// Creates a FlussArray from a byte slice (copies data). + pub fn from_bytes(data: &[u8]) -> Result { + let (size, element_offset) = Self::validate(data)?; + Ok(FlussArray { + data: Bytes::copy_from_slice(data), + size, + element_offset, + }) + } + + /// Creates a FlussArray from an owned `Vec` without copying. + pub fn from_vec(data: Vec) -> Result { + let (size, element_offset) = Self::validate(&data)?; + Ok(FlussArray { + data: Bytes::from(data), + size, + element_offset, + }) + } + + /// Creates a FlussArray from owned bytes without copying. + fn from_owned_bytes(data: Bytes) -> Result { + let (size, element_offset) = Self::validate(&data)?; + Ok(FlussArray { + data, + size, + element_offset, + }) + } + + /// Returns the number of elements. + pub fn size(&self) -> usize { + self.size + } + + /// Returns the raw bytes of this array (the complete binary representation). + pub fn as_bytes(&self) -> &[u8] { + &self.data + } + + /// Returns true if the element at position `pos` is null. + pub fn is_null_at(&self, pos: usize) -> bool { + let byte_index = pos >> 3; + let bit = pos & 7; + (self.data[4 + byte_index] & (1u8 << bit)) != 0 + } + + fn checked_slice(&self, start: usize, len: usize, context: &str) -> Result<&[u8]> { + let end = start.checked_add(len).ok_or_else(|| IllegalArgument { + message: format!("Overflow while reading {context}: start={start}, len={len}"), + })?; + if end > self.data.len() { + return Err(IllegalArgument { + message: format!( + "Out-of-bounds while reading {context}: start={start}, len={len}, payload={}", + self.data.len() + ), + }); + } + Ok(&self.data[start..end]) + } + + fn checked_element_offset( + &self, + pos: usize, + element_size: usize, + context: &str, + ) -> Result { + if pos >= self.size { + return Err(IllegalArgument { + message: format!( + "Array element index out of bounds while reading {context}: pos={pos}, size={}", + self.size + ), + }); + } + let rel = pos.checked_mul(element_size).ok_or_else(|| IllegalArgument { + message: format!( + "Overflow while calculating array element offset for {context}: pos={pos}, element_size={element_size}" + ), + })?; + self.element_offset + .checked_add(rel) + .ok_or_else(|| IllegalArgument { + message: format!( + "Overflow while adding base offset for {context}: base={}, rel={rel}", + self.element_offset + ), + }) + } + + fn read_fixed_bytes(&self, pos: usize, len: usize, context: &str) -> Result<&[u8]> { + let offset = self.checked_element_offset(pos, len, context)?; + self.checked_slice(offset, len, context) + } + + fn read_i16(&self, pos: usize, context: &str) -> Result { + let bytes = self.read_fixed_bytes(pos, 2, context)?; + Ok(i16::from_le_bytes([bytes[0], bytes[1]])) + } + + fn read_i32(&self, pos: usize, context: &str) -> Result { + let bytes = self.read_fixed_bytes(pos, 4, context)?; + Ok(i32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])) + } + + fn read_i64(&self, pos: usize, context: &str) -> Result { + let bytes = self.read_fixed_bytes(pos, 8, context)?; + let mut buf = [0_u8; 8]; + buf.copy_from_slice(bytes); + Ok(i64::from_le_bytes(buf)) + } + + fn read_i64_at_offset(&self, offset: usize, context: &str) -> Result { + let bytes = self.checked_slice(offset, 8, context)?; + let mut buf = [0_u8; 8]; + buf.copy_from_slice(bytes); + Ok(i64::from_le_bytes(buf)) + } + + fn read_var_len_span(&self, pos: usize) -> Result<(usize, usize)> { + let field_offset = self.checked_element_offset(pos, 8, "variable-length array element")?; + let packed = self.read_i64(pos, "variable-length array element")? as u64; + let mark = packed & HIGHEST_FIRST_BIT; + + if mark == 0 { + let offset = (packed >> 32) as usize; + let len = (packed & 0xFFFF_FFFF) as usize; + let _ = self.checked_slice(offset, len, "variable-length array element")?; + Ok((offset, len)) + } else { + let len = ((packed & HIGHEST_SECOND_TO_EIGHTH_BIT) >> 56) as usize; + if len > MAX_FIX_PART_DATA_SIZE { + return Err(IllegalArgument { + message: format!( + "Inline array element length must be <= {MAX_FIX_PART_DATA_SIZE}, got {len}" + ), + }); + } + // Java stores inline bytes in the 8-byte slot itself. + // On little-endian, bytes start at field_offset; on big-endian they start at +1. + let start = if cfg!(target_endian = "little") { + field_offset + } else { + field_offset + 1 + }; + let _ = self.checked_slice(start, len, "inline array element")?; + Ok((start, len)) + } + } + + fn read_var_len_bytes(&self, pos: usize) -> Result<&[u8]> { + let (start, len) = self.read_var_len_span(pos)?; + Ok(&self.data[start..start + len]) + } + + pub fn get_boolean(&self, pos: usize) -> Result { + let bytes = self.read_fixed_bytes(pos, 1, "boolean array element")?; + Ok(bytes[0] != 0) + } + + pub fn get_byte(&self, pos: usize) -> Result { + let bytes = self.read_fixed_bytes(pos, 1, "byte array element")?; + Ok(bytes[0] as i8) + } + + pub fn get_short(&self, pos: usize) -> Result { + self.read_i16(pos, "short array element") + } + + pub fn get_int(&self, pos: usize) -> Result { + self.read_i32(pos, "int array element") + } + + pub fn get_long(&self, pos: usize) -> Result { + self.read_i64(pos, "long array element") + } + + pub fn get_float(&self, pos: usize) -> Result { + let bits = self.read_i32(pos, "float array element")? as u32; + Ok(f32::from_bits(bits)) + } + + pub fn get_double(&self, pos: usize) -> Result { + let bits = self.read_i64(pos, "double array element")? as u64; + Ok(f64::from_bits(bits)) + } + + /// Reads the offset_and_size packed long for variable-length elements. + fn get_offset_and_size(&self, pos: usize) -> Result<(usize, usize)> { + let packed = self.get_long(pos)? as u64; + let offset = (packed >> 32) as usize; + let size = (packed & 0xFFFF_FFFF) as usize; + Ok((offset, size)) + } + + pub fn get_string(&self, pos: usize) -> Result<&str> { + let bytes = self.read_var_len_bytes(pos)?; + std::str::from_utf8(bytes).map_err(|e| IllegalArgument { + message: format!("Invalid UTF-8 in array element at position {pos}: {e}"), + }) + } + + pub fn get_binary(&self, pos: usize) -> Result<&[u8]> { + self.read_var_len_bytes(pos) + } + + pub fn get_decimal(&self, pos: usize, precision: u32, scale: u32) -> Result { + if Decimal::is_compact_precision(precision) { + let unscaled = self.get_long(pos)?; + Decimal::from_unscaled_long(unscaled, precision, scale) + } else { + let (offset, size) = self.get_offset_and_size(pos)?; + let bytes = self.checked_slice(offset, size, "decimal bytes")?; + Decimal::from_unscaled_bytes(bytes, precision, scale) + } + } + + pub fn get_date(&self, pos: usize) -> Result { + Ok(Date::new(self.get_int(pos)?)) + } + + pub fn get_time(&self, pos: usize) -> Result