Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bindings/cpp/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ pub fn resolve_row_types(
Datum::Time(t) => Datum::Time(*t),
Datum::TimestampNtz(ts) => Datum::TimestampNtz(*ts),
Datum::TimestampLtz(ts) => Datum::TimestampLtz(*ts),
Datum::Row(r) => Datum::Row(Box::new(resolve_row_types(r, None)?)),
};
out.set_field(idx, resolved);
}
Expand Down
25 changes: 23 additions & 2 deletions crates/fluss/src/row/binary/binary_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::error::Error::IllegalArgument;
use crate::error::Result;
use crate::metadata::DataType;
use crate::metadata::{DataType, RowType};
use crate::row::Datum;
use crate::row::binary::BinaryRowFormat;

Expand Down Expand Up @@ -136,7 +136,7 @@ pub enum InnerValueWriter {
Time(u32), // precision (not used in wire format, but kept for consistency)
TimestampNtz(u32), // precision
TimestampLtz(u32), // precision
// TODO Array, Row
Row(RowType),
}

/// Accessor for writing the fields/elements of a binary writer during runtime, the
Expand Down Expand Up @@ -175,6 +175,7 @@ impl InnerValueWriter {
// Validation is done at TimestampLTzType construction time
Ok(InnerValueWriter::TimestampLtz(t.precision()))
}
DataType::Row(row_type) => Ok(InnerValueWriter::Row(row_type.clone())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should not store clone in innervaluewriter::row, probabl store in pre built child writer is better?

_ => unimplemented!(
"ValueWriter for DataType {:?} is currently not implemented",
data_type
Expand Down Expand Up @@ -237,6 +238,26 @@ impl InnerValueWriter {
(InnerValueWriter::TimestampLtz(p), Datum::TimestampLtz(ts)) => {
writer.write_timestamp_ltz(ts, *p);
}
(InnerValueWriter::Row(row_type), Datum::Row(inner_row)) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

currently, i think a new writer is created per write call, which is not ideal

use crate::row::compacted::CompactedRowWriter;
let field_count = row_type.fields().len();
let mut nested = CompactedRowWriter::new(field_count);
for (i, field) in row_type.fields().iter().enumerate() {
let datum = &inner_row.values[i];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

potential panic on OOB?

if datum.is_null() {
if field.data_type.is_nullable() {
nested.set_null_at(i);
}
} else {
let vw =
InnerValueWriter::create_inner_value_writer(&field.data_type, None)
.expect("create_inner_value_writer failed for nested row field");
vw.write_value(&mut nested, i, datum)
.expect("write_value failed for nested row field");
}
Comment on lines +253 to +257
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think in current way, it will panic inside a Result, does it mean those will be hidden for users?

}
writer.write_bytes(nested.buffer());
}
_ => {
return Err(IllegalArgument {
message: format!("{self:?} used to write value {value:?}"),
Expand Down
Loading