Conversation
…l view Adds a one-liner method that returns a copy of the Schema with optional_fields stripped, enabling `s1.as_required() == s2.as_required()` as the correct predicate for "same Arrow structure". Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…n_schema_to_arrow_schema Plain types (int, str, …) now produce nullable=False Arrow fields, while T | None (Optional[T]) types produce nullable=True. Added _is_optional_type helper and two new tests covering both cases. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…row_schema_to_python_schema Adds nullable-aware round-trip in arrow_schema_to_python_schema: nullable=True fields now reconstruct as T | None and nullable=False as plain T. To prevent spurious T | None from Arrow's default nullable=True convention, normalise raw Arrow table schemas to nullable=False in ArrowTableStream, Datagram, and StreamBuilder before calling arrow_schema_to_python_schema. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…xtract normalization helper Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…suite (PLT-923) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
| non_sys = arrow_data_utils.drop_system_columns(table) | ||
| tag_schema = non_sys.select(list(tag_columns_tuple)).schema | ||
| packet_schema = non_sys.drop(list(tag_columns_tuple)).schema | ||
| # Normalise to non-nullable: raw Arrow tables default to nullable=True for |
There was a problem hiding this comment.
Actually whether the nullability of the arrow table should be respected or not ought to be configurable. What if the nullability was totally intended?
There was a problem hiding this comment.
Addressed in dd6fd89. Added respect_nullable: bool = False to SourceStreamBuilder.build(). Default strips nullable before schema hashing (preserving existing behavior). With respect_nullable=True, nullable=True fields yield T | None in the Python schema and the hash reflects that — so intentional nullability is fully respected.
| tag_schema = pa.schema( | ||
| f for f in self._table.schema if f.name in self._tag_columns | ||
| # Cast all schemas to non-nullable: Arrow's default nullable=True is a | ||
| # storage convention, not a semantic declaration. The Python schema |
There was a problem hiding this comment.
This is too strong of an assumption -- while it's fair to assume that nullability is likely NOT intended, the current behavior prevents the user from ever meaningfully actually specifying arrow tables with properly configured nullability. Whether nullability will be dropped should be configurable, while defaulting to dropping is likely acceptable
There was a problem hiding this comment.
Addressed in dd6fd89. Added respect_nullable: bool = False to ArrowTableStream.init(). Default (False) normalises all fields to non-nullable before schema conversion, guarding against Arrow's storage convention of defaulting nullable=True. Set respect_nullable=True to opt-in to preserving the Arrow nullable flags, so nullable=True fields produce T | None in output_schema().
| system_tag_schema = pa.schema( | ||
| f for f in self._table.schema if f.name in self._system_tag_columns | ||
| system_tag_schema = arrow_utils.make_schema_non_nullable( | ||
| pa.schema( |
There was a problem hiding this comment.
Same as comment above on assuming nullabilty to be dropped always
There was a problem hiding this comment.
Addressed in dd6fd89. Same fix applied here (Datagram.init): added respect_nullable: bool = False. When False (default), raw Arrow schemas are stripped to non-nullable before python schema derivation. When True, nullable=True fields yield T | None. The flag is also preserved across copy() calls.
…eam, Datagram, SourceStreamBuilder Add respect_nullable: bool = False parameter to ArrowTableStream.__init__, Datagram.__init__, and SourceStreamBuilder.build(). When False (default), raw Arrow tables are normalised to non-nullable before schema conversion — preserving the existing behavior and protecting against Arrow's storage convention of defaulting all fields to nullable=True. When True, users can opt-in to having nullable=True fields yield T | None Python types. Addresses PR review feedback requesting that nullability-dropping be configurable rather than unconditional. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Review feedback addressed (dd6fd89)All three review comments requested that the nullable-stripping behaviour be configurable rather than always-on. This has been implemented as a
Default behaviour ( Opt-in ( Nine new tests cover the configurable behaviour across all three classes. Full test suite: 3104 passed, 56 skipped. |
…der; operators preserve Arrow nullable flags
ArrowTableStream and Datagram now always respect Arrow nullable flags:
- nullable=True → T | None in output_schema / schema()
- nullable=False → T in output_schema / schema()
The only place that strips nullable flags is SourceStreamBuilder.build()
(respect_nullable=False by default), which represents the data-ingestion
boundary where real-world data arrives without semantic optionality.
Corruption sites fixed:
- semijoin.py: cast back to left_table.schema after Arrow left-semi join
- merge_join.py: use from_arrays instead of cast for list columns
- batch.py: use from_arrays to restore nullable=False after pa.Table.from_pylist
- join.py: restore nullable=False for user-data and system-tag columns
after Polars inner join; predict system-tag schema as str | None when
sources differ
- arrow_utils.prepare_prefixed_columns: preserve nullable flags from
source table schema
Schema equality helper:
- schema_utils.get_compatible_type: add type1 == type2 equality check
for Python 3.10+ UnionType objects that are equal but not identical
Test fixtures updated across the test suite to use explicit nullable=False
schemas, matching what SourceStreamBuilder produces. Added pythonpath = src
to pytest.ini so the local source tree takes precedence over the editable
install when running tests.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Follow-up refactor: nullable responsibility moved to the ingestion boundary (468b217)After discussion, the design has been tightened further: What changed
Nullable-corruption sites fixedSeveral internal sites were resetting
Result3106 tests pass (2 more than before — new tests for |
…st-hoc rebuild Pass `schema=` directly to `pa.Table.from_pylist()` using a schema derived from the input table (each field promoted to `list<T>`, nullable=False), replacing the two-step build-then-rebuild approach. Also drops the now-unused `arrow_utils` import. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…n sites
Four sites were producing Arrow fields with the default nullable=True even
though the data is never null:
- arrow_utils.prepare_prefixed_columns: prefix table was built with names=
(no schema), losing nullable flags. Now constructs an explicit schema;
columns that are genuinely all-null (unknown source info) keep
nullable=True, all others get nullable=False.
- datagram._ensure_context_table: pa.schema({key: type}) dict shorthand
always yields nullable=True. Replaced with pa.schema([pa.field(...,
nullable=False)]).
- ArrowTableStream.__init__: synthesised context column used pa.array()
with no nullable override. Now passes an explicit schema= to pa.table().
- ArrowTableStream.as_table: content-hash column appended via bare string
name, inheriting nullable=True. Replaced with pa.field(..., nullable=False)
as the first argument to append_column().
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… null counts prepare_prefixed_columns previously inferred nullable flags from null_count on the generated arrays. This is the wrong signal — null_count reflects data, not design intent, and causes fragile behaviour (e.g. a single non-null row in an otherwise-null pass-through column would incorrectly become nullable=False). New approach: - Concrete value supplied via source_info → nullable=False (value is never null) - Pass-through of an existing column → inherit nullable from source field - No value / key absent → nullable=True (column is all-null) - Safety guard: if actual data has nulls, promote to nullable=True regardless (handles e.g. join results that introduce nulls into previously non-null cols) Also adds an optional prefix_schemas parameter so callers can supply an explicit schema per prefix when the default inference is not appropriate. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ts_to_arrow_table SemanticSchemaConverter.python_dicts_to_arrow_table previously always used self.arrow_schema (pre-computed at construction). Adding an optional arrow_schema= parameter is consistent with UniversalTypeConverter's interface and with the prepare_prefixed_columns prefix_schemas pattern — callers that need to enforce specific field types or nullable flags can now do so without constructing a new converter instance. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…source SourceStreamBuilder.build() previously stripped all nullable flags unconditionally (respect_nullable=False default). This was misleading and error-prone: the flag lived inside a generic enrichment pipeline rather than at the source boundary where the decision belongs. New design: - build() is a pure enrichment pipeline — it trusts the incoming table's nullable flags as-is. No nullable opinion, no parameters. - Each source that receives a raw Arrow table (whose schema Arrow/Polars default to nullable=True regardless of content) explicitly calls arrow_utils.infer_schema_nullable(table) before handing off to build(). This infers nullable=False for null-free columns, nullable=True for columns that actually contain nulls. - Sources with an explicit data_schema (ListSource, DictSource) skip inference — the schema from python_schema_to_arrow_schema already has correct nullable flags based on Optional[T] annotations. Sources updated: ArrowTableSource, DataFrameSource, ListSource, DictSource, CsvSource, DbTableSource, DeltaTableSource. Also adds arrow_utils.infer_schema_nullable() as a standalone utility. Tests updated to reflect the new contract. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
When infer_nullable=True (default) nullable flags are derived from actual column data via infer_schema_nullable. Pass infer_nullable=False to trust the incoming table's schema as-is — useful when nullable flags have been set deliberately (e.g. Optional columns in a hand-crafted schema). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…TableSource
The test-objective helpers were constructing ArrowTableStream directly
with plain pa.table({...}), which defaults all fields to nullable=True.
This caused FunctionPod schema validation to fail (int | None vs int).
ArrowTableStream is internal — callers are responsible for providing
correctly-typed tables. ArrowTableSource is the correct public entry
point; it applies infer_schema_nullable automatically.
- test_function_pod, test_nodes, test_tracker: _make_stream helpers
now use ArrowTableSource (mirrors real usage, gets inference for free)
- test_stream: kept ArrowTableStream (it's a deliberate unit test for
that class), but table helpers now declare explicit nullable=False
schemas so the stream reflects the intended non-nullable types
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR aligns OrcaPod’s Python Schema typing with Arrow schema nullability so that plain types map to nullable=False and T | None maps to nullable=True, enabling correct round-trips and consistent logical schema equality (via Starfix hashing). It also adds utilities and extensive tests to verify ordering-insensitive, nullability-sensitive schema equality.
Changes:
- Fix Python↔Arrow schema conversion to correctly encode/decode nullability (
TvsT | None) and addSchema.as_required()for Arrow-structural comparisons. - Add Arrow schema nullability helpers (
infer_schema_nullable,make_schema_non_nullable) and update ingestion paths/sources to set nullable flags more deliberately. - Add broad test coverage for nullability preservation, schema logical equality, and update many existing tests to use explicit
nullable=Falseschemas.
Reviewed changes
Copilot reviewed 52 out of 53 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
src/orcapod/semantic_types/universal_converter.py |
Derives Arrow field nullability from Python optional types and reconstructs `T |
src/orcapod/types.py |
Adds Schema.as_required() to drop optional_fields metadata for structural comparisons. |
src/orcapod/utils/arrow_utils.py |
Preserves nullability in prepare_prefixed_columns; adds make_schema_non_nullable() and infer_schema_nullable(). |
src/orcapod/utils/schema_utils.py |
Treats ==-equal type objects as compatible in get_compatible_type. |
src/orcapod/semantic_types/precomputed_converters.py |
Allows overriding the Arrow schema when building tables from Python dicts. |
src/orcapod/core/streams/arrow_table_stream.py |
Ensures constructed system/context/hash columns are explicitly non-nullable; documents respecting nullable flags when producing Python schemas. |
src/orcapod/core/sources/stream_builder.py |
Updates builder docs to “trust incoming nullable flags”; adds import for nullable helpers (currently unused in code). |
src/orcapod/core/sources/arrow_table_source.py |
Adds infer_nullable option (defaulting to inferring nullable from data) before building the source stream. |
src/orcapod/core/sources/list_source.py |
Accepts optional data_schema and infers nullable from data when schema isn’t provided. |
src/orcapod/core/sources/dict_source.py |
Infers nullable from data when data_schema isn’t provided. |
src/orcapod/core/sources/data_frame_source.py |
Converts Polars→Arrow then infers nullable from actual values before building. |
src/orcapod/core/sources/csv_source.py |
Infers nullable from CSV-loaded Arrow table values before building. |
src/orcapod/core/sources/db_table_source.py |
Infers nullable from DB-loaded Arrow table values before building. |
src/orcapod/core/sources/delta_table_source.py |
Infers nullable from Delta-loaded Arrow table values before building. |
src/orcapod/core/operators/join.py |
After Polars joins, attempts to restore nullable=False on selected columns before returning an ArrowTableStream. |
src/orcapod/core/operators/merge_join.py |
Rebuilds output schema with all fields nullable=False to avoid “spurious Optional” types after joins. |
src/orcapod/core/operators/semijoin.py |
Casts semi-join result back to left schema to restore nullable flags (adds unused import). |
src/orcapod/core/operators/batch.py |
Builds a list-valued batch table with an explicit schema and nullable=False fields. |
src/orcapod/core/datagrams/datagram.py |
Clarifies nullable semantics when materializing Python schema; makes context field explicitly non-nullable. |
tests/test_semantic_types/test_universal_converter.py |
Adds unit tests for nullability mapping and Schema.as_required(). |
tests/test_semantic_types/test_schema_arrow_equality.py |
New comprehensive suite validating Schema↔Arrow logical equality (order-insensitive, nullability-sensitive) and round-trips. |
tests/test_utils/__init__.py |
Adds test package init for utils tests. |
tests/test_utils/test_arrow_utils.py |
New tests ensuring prepare_prefixed_columns() preserves nullable flags in returned tables. |
tests/test_core/streams/test_streams.py |
Uses explicit non-nullable schemas in fixtures; updates assertion to accept types.UnionType for optional types; adds nullable preservation tests. |
tests/test_core/sources/test_stream_builder.py |
Adds tests that builder respects incoming nullable flags and that nullability affects schema hash. |
tests/test_core/datagrams/test_lazy_conversion.py |
Adds tests that Datagram.schema() respects Arrow nullable flags. |
tests/test_core/test_tracker.py |
Updates test streams to use explicit nullable=False schemas. |
tests/test_core/test_regression_fixes.py |
Updates streams/tables to use explicit nullable=False schemas. |
tests/test_core/sources/test_derived_source.py |
Ensures derived-source test table uses explicit non-nullable schema. |
tests/test_core/packet_function/test_executor.py |
Adds explicit non-nullable schema when building test streams. |
tests/test_core/operators/test_merge_join.py |
Updates merge-join test inputs to explicit Arrow arrays and non-nullable schema. |
tests/test_core/function_pod/test_simple_function_pod.py |
Updates test streams to attach explicit non-nullable schemas. |
tests/test_core/function_pod/test_pipeline_hash_integration.py |
Ensures schema hashing tests use explicit non-nullable schemas; avoids schema drift. |
tests/test_core/function_pod/test_function_pod_node.py |
Adds explicit schema (incl. large_string) and non-nullable flags in node/table setup. |
tests/test_core/function_pod/test_function_pod_node_stream.py |
Adds explicit non-nullable schema to test input table. |
tests/test_core/function_pod/test_function_pod_extended.py |
Uses explicit non-nullable schemas in stream fixtures; avoids default-nullable Arrow behavior. |
tests/test_core/function_pod/test_function_pod_decorator.py |
Adds explicit non-nullable schema in end-to-end test setup. |
tests/test_core/function_pod/test_function_node_caching.py |
Adds explicit non-nullable schema based on row keys. |
tests/test_core/function_pod/test_function_node_attach_db.py |
Adds explicit non-nullable schema in stream construction. |
tests/test_core/function_pod/test_cached_function_pod.py |
Adds explicit non-nullable schema based on row keys. |
tests/test_core/conftest.py |
Updates common stream fixtures to use explicit non-nullable schemas. |
tests/test_channels/test_pipeline_example.py |
Adds explicit non-nullable schemas to example pipeline sources. |
tests/test_channels/test_node_async_execute.py |
Adds explicit non-nullable schemas in async execution fixtures. |
tests/test_channels/test_native_async_operators.py |
Adds helper to build non-nullable schemas; updates fixtures to explicit arrays + schemas. |
tests/test_channels/test_copilot_review_issues.py |
Adds explicit non-nullable schema to test stream. |
tests/test_channels/test_async_execute.py |
Adds explicit non-nullable schemas to fixtures. |
test-objective/unit/test_tracker.py |
Switches from ArrowTableStream to ArrowTableSource in objective tests. |
test-objective/unit/test_stream.py |
Adds explicit non-nullable schemas to unit test tables. |
test-objective/unit/test_nodes.py |
Switches to ArrowTableSource for objective tests and updates joinable stream factories. |
test-objective/unit/test_function_pod.py |
Switches stream factories to ArrowTableSource in objective tests. |
superpowers/specs/2026-04-02-schema-arrow-equality-design.md |
Adds design spec documenting the nullability gap and intended semantics/tests. |
superpowers/plans/2026-04-02-schema-arrow-equality.md |
Adds implementation plan and TDD checklist for the change set. |
pytest.ini |
Adds pythonpath = src for test import resolution. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/orcapod/core/operators/join.py
Outdated
| new_fields = [] | ||
| for field in result_table.schema: | ||
| if field.name in non_nullable_cols: | ||
| new_fields.append( | ||
| pa.field(field.name, field.type, nullable=False, metadata=field.metadata) | ||
| ) | ||
| else: | ||
| new_fields.append(field) | ||
| new_schema = pa.schema(new_fields, metadata=result_table.schema.metadata) | ||
| result_table = result_table.cast(new_schema) |
There was a problem hiding this comment.
The join rebuild forces nullable=False for a broad set of output columns (essentially everything except _source_* / non-system datagram columns). This can (a) drop legitimate optionality from input schemas (nullable=True → T | None) and (b) raise during result_table.cast(new_schema) whenever any of those columns actually contains nulls (e.g., an Optional[...] packet field). Consider preserving nullable flags from the input tables/streams per column (e.g., maintain a name→Field map across joins and apply it after Polars join/rename), or at minimum derive nullable from the joined data (null_count) instead of unconditionally setting False.
| new_fields = [] | |
| for field in result_table.schema: | |
| if field.name in non_nullable_cols: | |
| new_fields.append( | |
| pa.field(field.name, field.type, nullable=False, metadata=field.metadata) | |
| ) | |
| else: | |
| new_fields.append(field) | |
| new_schema = pa.schema(new_fields, metadata=result_table.schema.metadata) | |
| result_table = result_table.cast(new_schema) | |
| # Only enforce nullable=False for columns that actually contain no nulls. | |
| # This avoids cast failures when optional fields legitimately have nulls. | |
| effective_non_nullable_cols: set[str] = set() | |
| for col_name in non_nullable_cols: | |
| # result_table.column(...) returns a ChunkedArray with a null_count attribute. | |
| if result_table.column(col_name).null_count == 0: | |
| effective_non_nullable_cols.add(col_name) | |
| if effective_non_nullable_cols: | |
| new_fields = [] | |
| for field in result_table.schema: | |
| if field.name in effective_non_nullable_cols: | |
| new_fields.append( | |
| pa.field( | |
| field.name, | |
| field.type, | |
| nullable=False, | |
| metadata=field.metadata, | |
| ) | |
| ) | |
| else: | |
| new_fields.append(field) | |
| new_schema = pa.schema(new_fields, metadata=result_table.schema.metadata) | |
| result_table = result_table.cast(new_schema) |
There was a problem hiding this comment.
Fixed in commit 641473e. Replaced the hand-rolled nullable loop with a single result_table.cast(arrow_utils.infer_schema_nullable(result_table)) call. This derives nullable = (null_count > 0) per column — columns with no nulls (tag/packet fields after an inner join) become nullable=False, while columns that genuinely contain nulls (e.g. Optional fields, source-info columns after cross-stream joins) keep nullable=True, preventing the cast failure the old approach could trigger.
| # Rebuild schema with nullable=False to avoid spurious T | None types. | ||
| # pa.Table.cast() is not used here because list columns may fail validation. | ||
| new_schema = arrow_utils.make_schema_non_nullable(joined.schema) |
There was a problem hiding this comment.
Rebuilding the MergeJoin output schema with make_schema_non_nullable() sets every field to nullable=False, which can invalidate legitimately nullable/optional columns from either input stream and may produce incorrect output_schema() types (Optional fields become plain T). Instead, restore/merge nullable flags from the input schemas (or compute nullable per column from the joined arrays’ null_count) so optionality is preserved while still avoiding the “all nullable=True” Polars default.
| # Rebuild schema with nullable=False to avoid spurious T | None types. | |
| # pa.Table.cast() is not used here because list columns may fail validation. | |
| new_schema = arrow_utils.make_schema_non_nullable(joined.schema) | |
| # Rebuild schema with accurate nullability per column to avoid spurious | |
| # T | None types from Polars' "all nullable=True" default while preserving | |
| # legitimately nullable fields. | |
| updated_fields = [] | |
| for i, field in enumerate(joined.schema): | |
| col = joined.column(i) | |
| # Mark field as non-nullable only if the data actually contains no nulls. | |
| nullable = col.null_count > 0 | |
| updated_fields.append(field.with_nullable(nullable)) | |
| new_schema = pa.schema(updated_fields, metadata=joined.schema.metadata) |
There was a problem hiding this comment.
Fixed in commit 641473e. Replaced make_schema_non_nullable(joined.schema) with infer_schema_nullable(joined), which derives nullable = (null_count > 0) per column. This preserves legitimately nullable/optional columns while still eliminating Polars' all-True default artifact. pa.Table.from_arrays is still used (rather than cast) to safely handle list-typed columns.
| from orcapod.protocols.core_protocols import PacketProtocol, StreamProtocol, TagProtocol | ||
| from orcapod.types import ColumnConfig, Schema | ||
| from orcapod.utils import schema_utils | ||
| from orcapod.utils import arrow_utils, schema_utils |
There was a problem hiding this comment.
arrow_utils is imported but not referenced in this module (the implementation only uses schema_utils and pa). Dropping the unused import helps avoid lint failures and keeps dependencies clear.
| from orcapod.utils import arrow_utils, schema_utils | |
| from orcapod.utils import schema_utils |
There was a problem hiding this comment.
Fixed in commit 641473e. Removed the unused arrow_utils import from semijoin.py — only schema_utils is used at runtime.
| from orcapod.system_constants import constants | ||
| from orcapod.types import ContentHash | ||
| from orcapod.utils import arrow_data_utils | ||
| from orcapod.utils import arrow_data_utils, arrow_utils |
There was a problem hiding this comment.
arrow_utils is imported but only mentioned in docstrings. If you’re not using it at runtime, consider removing the import to avoid unused-import lint failures (docstring references don’t require an import).
| from orcapod.utils import arrow_data_utils, arrow_utils | |
| from orcapod.utils import arrow_data_utils |
There was a problem hiding this comment.
Fixed in commit 641473e. Removed the unused arrow_utils import from stream_builder.py — only arrow_data_utils is used at runtime.
| def __init__( | ||
| self, | ||
| table: "pa.Table", | ||
| tag_columns: Collection[str] = (), | ||
| system_tag_columns: Collection[str] = (), | ||
| record_id_column: str | None = None, | ||
| infer_nullable: bool = True, | ||
| **kwargs: Any, | ||
| ) -> None: | ||
| super().__init__(**kwargs) | ||
|
|
||
| if infer_nullable: | ||
| # Arrow tables default all fields to nullable=True regardless of | ||
| # content; derive the correct flags from the actual data. | ||
| table = table.cast(arrow_utils.infer_schema_nullable(table)) | ||
| # else: schema is trusted as-is — caller has set nullable deliberately. | ||
|
|
There was a problem hiding this comment.
infer_nullable=True will override any nullable flags already present on the incoming table by re-deriving them from null_count. This can unintentionally narrow a caller-supplied schema where a column is intentionally nullable=True even if the current sample has no nulls (i.e., optional-but-not-observed-yet). Consider defaulting infer_nullable to False, or only inferring when the incoming schema appears “raw/default” (e.g., all fields nullable=True with no explicit intent).
There was a problem hiding this comment.
Fixed in commit 641473e. Changed the default to infer_nullable=False — the safer choice. ArrowTableSource is unique in accepting a pre-existing pa.Table whose nullable flags may already be deliberate (e.g. loaded from a typed Parquet file, or produced by an upstream pipeline stage). Defaulting to inference silently overwrites intentional nullable=True on columns that happen to have no nulls in the current sample, creating schema drift when sparser batches arrive later.
The new default trusts the incoming schema as-is. Callers with raw Arrow/Polars tables (all nullable=True by Arrow default) should pass infer_nullable=True explicitly — this makes the intent visible at the call site. All other sources (DictSource, ListSource, DataFrameSource, CSV/DB/Delta) still infer automatically since they construct their Arrow tables internally and Arrow's all-True default is never intentional there.
The design rationale is documented in the updated class docstring. All test helpers that used raw pa.table() were updated to pass infer_nullable=True explicitly.
1. ArrowTableSource: change infer_nullable default True→False ArrowTableSource is unique in accepting a pre-existing pa.Table whose nullable flags may already be deliberate (e.g. loaded from Parquet or produced by an upstream pipeline stage). Defaulting to inference silently overwrites intentional nullable=True on columns that happen to have no nulls in the current sample, creating schema drift when sparser batches arrive later. The new default trusts the incoming schema as-is; callers with raw Arrow/Polars tables (all-nullable by Arrow default) should pass infer_nullable=True explicitly. All other sources (DictSource, ListSource, DataFrameSource, CSV/DB/Delta) still infer automatically because they construct their Arrow tables internally and Arrow's all-True default is never intentional there. Design rationale documented in class docstring. All test helpers that used raw pa.table() updated to pass infer_nullable=True. 2. stream_builder.py: remove unused arrow_utils import 3. semijoin.py: remove unused arrow_utils import 4. merge_join.py: replace make_schema_non_nullable with infer_schema_nullable The previous approach forced nullable=False on every field, which would incorrectly strip legitimate Optional columns. infer_schema_nullable derives nullable from actual null counts, preserving intentional optionality while still eliminating Polars' all-True default artifact. 5. join.py: replace hand-rolled nullable loop with infer_schema_nullable Same rationale as merge_join — the manual loop forced nullable=False on a broad set of columns and would raise on cast if any of those columns actually contained nulls. infer_schema_nullable is safe for all cases. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Review round 2 — all comments addressed (commit 641473e)1.
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 84 out of 85 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/orcapod/utils/arrow_utils.py
Outdated
| exclude_prefixes: Collection[str] = (), | ||
| prefix_schemas: "dict[str, pa.Schema] | None" = None, | ||
| ) -> tuple["pa.Table", dict[str, "pa.Table"]]: | ||
| """ """ |
There was a problem hiding this comment.
prepare_prefixed_columns has an empty docstring (""" """), and the signature now includes the new prefix_schemas parameter. Please update the docstring to describe what the function returns, how nullability is determined/preserved, and how prefix_schemas is expected to be shaped/used (incl. what happens if it doesn’t match the constructed columns).
| """ """ | |
| """ | |
| Split a table into non-prefixed columns and per-prefix tables. | |
| Columns whose names start with one of the configured prefixes are grouped | |
| into a separate output table for that prefix, with the matching prefix | |
| removed from each grouped column name. Columns that do not match any | |
| prefix remain in the returned data table. | |
| Args: | |
| table: The input ``pyarrow.Table`` or ``pyarrow.RecordBatch`` whose | |
| columns will be partitioned into a main data table and zero or more | |
| prefix-specific tables. | |
| prefix_info: Prefix configuration. This may be: | |
| * a collection of prefix strings, in which case each prefix is used | |
| with no additional per-prefix metadata; or | |
| * a mapping from prefix string to metadata/``None``; or | |
| * a mapping from prefix string to a mapping of per-column metadata. | |
| The exact metadata values are consumed by the implementation when | |
| constructing the per-prefix tables, but the keys define the set of | |
| prefixes that will be recognized. | |
| exclude_columns: Column names to omit from the constructed output. | |
| exclude_prefixes: Prefix groups to skip entirely even if matching input | |
| columns are present. | |
| prefix_schemas: Optional mapping from prefix string to the | |
| ``pyarrow.Schema`` that should be used when constructing that | |
| prefix's output table. Each schema is expected to describe the | |
| columns after the prefix has been removed (that is, it should match | |
| the constructed column names, not the original names in ``table``). | |
| When provided, the schema is used to preserve/define field order, | |
| types, and nullability for that prefix table. If a provided schema | |
| does not match the columns that are actually constructed for the | |
| prefix (for example, missing fields, extra fields, or incompatible | |
| names/types), table construction is not silently reconciled; the | |
| underlying ``pyarrow`` schema/table operation will fail. | |
| Returns: | |
| A tuple ``(data_table, prefixed_tables)`` where ``data_table`` is a | |
| ``pyarrow.Table`` containing the columns from ``table`` that were not | |
| assigned to any prefix group, and ``prefixed_tables`` is a dictionary | |
| mapping each recognized prefix to its constructed ``pyarrow.Table``. | |
| Nullability is preserved from the source columns for columns copied | |
| directly from ``table``. For columns created or reordered under an | |
| explicit schema, the resulting field nullability follows the | |
| corresponding ``pyarrow.Schema``/field definition used for that output. | |
| """ |
There was a problem hiding this comment.
Fixed in ff693d1. Replaced the empty """ """ docstring with a full docstring covering: what the function does, all parameters (including the shapes accepted by prefix_info and the contract for prefix_schemas — including what happens when the schema doesn't match the constructed columns), and the return value with nullability semantics.
src/orcapod/utils/arrow_utils.py
Outdated
| on every field. | ||
| """ | ||
| return pa.schema( | ||
| [pa.field(f.name, f.type, nullable=False, metadata=f.metadata) for f in schema] |
There was a problem hiding this comment.
make_schema_non_nullable() currently drops schema-level metadata because it builds the new schema without passing metadata=schema.metadata. This can silently strip extension/pandas metadata when callers cast/rebuild tables. Preserve schema.metadata on the returned schema (similar to normalize_schema_to_large_types).
| [pa.field(f.name, f.type, nullable=False, metadata=f.metadata) for f in schema] | |
| [pa.field(f.name, f.type, nullable=False, metadata=f.metadata) for f in schema], | |
| metadata=schema.metadata, |
There was a problem hiding this comment.
Fixed in ff693d1. Added metadata=schema.metadata as the second argument to pa.schema() in make_schema_non_nullable. Field-level metadata (metadata=f.metadata) was already preserved; only the schema-level metadata was missing.
src/orcapod/utils/arrow_utils.py
Outdated
| return pa.schema([ | ||
| pa.field(field.name, field.type, nullable=table.column(i).null_count > 0, metadata=field.metadata) | ||
| for i, field in enumerate(table.schema) | ||
| ]) |
There was a problem hiding this comment.
infer_schema_nullable() also drops schema-level metadata (it doesn’t pass metadata=table.schema.metadata when constructing the new pa.Schema). Since this helper is used in joins and sources, metadata loss can propagate widely; consider preserving table.schema.metadata on the inferred schema.
| return pa.schema([ | |
| pa.field(field.name, field.type, nullable=table.column(i).null_count > 0, metadata=field.metadata) | |
| for i, field in enumerate(table.schema) | |
| ]) | |
| return pa.schema( | |
| [ | |
| pa.field( | |
| field.name, | |
| field.type, | |
| nullable=table.column(i).null_count > 0, | |
| metadata=field.metadata, | |
| ) | |
| for i, field in enumerate(table.schema) | |
| ], | |
| metadata=table.schema.metadata, | |
| ) |
There was a problem hiding this comment.
Fixed in ff693d1. Added metadata=table.schema.metadata to the pa.schema() call in infer_schema_nullable, and reformatted the list comprehension to multi-line for readability. Field-level metadata (metadata=field.metadata) was already preserved; only the schema-level metadata was missing.
…ixed_columns - make_schema_non_nullable: pass metadata=schema.metadata to pa.schema() - infer_schema_nullable: pass metadata=table.schema.metadata to pa.schema() (field-level metadata was already preserved; only schema-level was missing) - prepare_prefixed_columns: replace empty docstring with full Args/Returns documentation covering prefix_info shapes, prefix_schemas contract, and nullability behaviour Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Review round 2 — all 3 comments addressed (commit ff693d1)All three Copilot comments were in 1.
|
Add TestMakeSchemaNotNullable and TestInferSchemaNullable to tests/test_utils/test_arrow_utils.py covering: - nullable flag inference/forcing is correct - field-level metadata is preserved - schema-level metadata is preserved (the gap Copilot caught) - None/empty metadata on the source stays None/empty These tests would have caught the schema-level metadata drop before ff693d1. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
eywalker
left a comment
There was a problem hiding this comment.
Generally there's been quite wasteful use of constructing Arrow table from columns with explicit field data type AND explicit schema info (which again include column data type). In these cases, consider simply constructing arrow table from pydict or pylist with the schema.
| @@ -258,7 +263,9 @@ def _ensure_context_table(self) -> pa.Table: | |||
| if self._context_table is None: | |||
| import pyarrow as _pa | |||
There was a problem hiding this comment.
pa is already lazy imported so do not import yet again. Make sure we do not do this pattern elsewhere as well.
There was a problem hiding this comment.
Fixed in f31474b. Removed the import pyarrow as _pa inside _ensure_context_table and replaced all _pa. references with the module-level lazy-imported pa.
src/orcapod/core/operators/join.py
Outdated
| from orcapod.system_constants import constants | ||
| from orcapod.types import ColumnConfig, ContentHash, Schema | ||
| from orcapod.utils import arrow_data_utils, schema_utils | ||
| from orcapod.utils import arrow_data_utils, arrow_utils, schema_utils |
There was a problem hiding this comment.
what's the explicit difference between arrow_utils and arrow_data_utils?
There was a problem hiding this comment.
Fixed in f31474b. Consolidated the two modules: all functions from arrow_data_utils.py have been moved into arrow_utils.py and arrow_data_utils.py deleted. The split was confusing — both modules dealt with Arrow tables but the distinction was not meaningful. All callers (join.py, merge_join.py, stream_builder.py, test_operators.py) now import from arrow_utils only.
| # "spurious T | None from Polars default" and the opposite mistake of | ||
| # forcing nullable=False on columns that actually have nulls. | ||
| # pa.Table.cast() is not used here because list columns may fail validation. | ||
| inferred_schema = arrow_utils.infer_schema_nullable(joined) |
There was a problem hiding this comment.
this is a hack to get around the fact polars is used but fundamentally fails to account for the original nullability of the columns that should be obtained and used to infer the resultant table's column nullability. If nullability info is lost as one uses polars, this information must be captured prior to the conversion, and then the proper nullability should be computed/derived from the input stream's schema along with their original nullability.
There was a problem hiding this comment.
Fixed in f31474b. Nullable flags are now captured from both input table schemas BEFORE the Polars join (which discards nullable info). An output_nullable dict is built from left/right schemas; merged (colliding) columns are marked nullable=False since they always produce a 2-element list. The final schema is built from output_nullable with a data-based fallback only for unexpected columns.
There was a problem hiding this comment.
Fixed in f31474b. Captured input nullability from both tables BEFORE the Polars join into output_nullable dict, then reconstructed the result schema from those captured flags (with a null_count fallback for unexpected columns). Polars cannot lose what it never had.
| # Derive nullable from actual data: nullable=True only for columns | ||
| # that genuinely contain nulls. Opt-in because the caller may have | ||
| # set nullable flags deliberately and we should not override them. | ||
| table = table.cast(arrow_utils.infer_schema_nullable(table)) |
There was a problem hiding this comment.
Fixed in f31474b. Added import logging and logger = logging.getLogger(__name__) at the module level. The if infer_nullable: block now emits a logger.debug message before casting.
There was a problem hiding this comment.
Fixed in f31474b. Added import logging and logger = logging.getLogger(name) to arrow_table_source.py, and added logger.debug(...) inside the if infer_nullable: block.
|
|
||
| The file is read once at construction time using PyArrow's CSV reader, | ||
| converted to an Arrow table, and enriched by ``SourceStreamBuilder`` | ||
| (source-info, schema-hash, system tags). |
There was a problem hiding this comment.
For all sources that converts the data into arrow table first, it must accept an optional schema | arrow schema parameter that would allow for the user to properly specify the data type and nullability of the data. When schema is passed in, infer_schema_nullable should not be run, and the provided schema should be used instead.
There was a problem hiding this comment.
Fixed in f31474b. Added schema: pa.Schema | None = None parameter to CSVSource.__init__. When provided, the table is cast to the caller-supplied schema instead of running infer_schema_nullable.
There was a problem hiding this comment.
Fixed in f31474b. Added schema: pa.Schema | None = None to CSVSource.init. When provided, table.cast(schema) is used directly; otherwise infer_schema_nullable runs.
| self, | ||
| data: list[dict[str, Any]], | ||
| arrow_schema: "pa.Schema | None" = None, | ||
| ) -> "pa.Table": |
There was a problem hiding this comment.
remove quotes around type hints
There was a problem hiding this comment.
Fixed in f31474b. Removed the redundant quotes. The file already has from __future__ import annotations, making string-quoted annotations unnecessary.
There was a problem hiding this comment.
Fixed in 098d653. Removed quotes from the arrow_schema parameter type and the return type — the file already has from future import annotations.
src/orcapod/utils/arrow_utils.py
Outdated
| | Mapping[str, Mapping[str, Any | None]], | ||
| exclude_columns: Collection[str] = (), | ||
| exclude_prefixes: Collection[str] = (), | ||
| prefix_schemas: "dict[str, pa.Schema] | None" = None, |
There was a problem hiding this comment.
why is this override even necessary? The nature of the column (type and nullability) is completely specified in the original table/record batch's schema. I don't see the point of accepting an override?
There was a problem hiding this comment.
Fixed in f31474b. Removed prefix_schemas from prepare_prefixed_columns entirely — it had zero callers outside the function. The result-tables loop now always derives the schema from the column data and nullable flags tracked during construction.
There was a problem hiding this comment.
Fixed in f31474b. Removed the prefix_schemas parameter entirely. Nullable flags are now derived directly from the source column's field.nullable on pass-through columns, set to False for freshly constructed columns with a present value, and set to True for all-null columns. No override needed.
src/orcapod/utils/arrow_utils.py
Outdated
| # Use caller-supplied schema if provided; otherwise derive from intent | ||
| # recorded during column construction (value present → nullable=False, | ||
| # pass-through → inherit source field, all-null → nullable=True). | ||
| if prefix_schemas and prefix in prefix_schemas: |
There was a problem hiding this comment.
I find the treatment here unnecessarily complicated
There was a problem hiding this comment.
Fixed in f31474b. Removed the if prefix_schemas branch as part of dropping the parameter. The loop now has a single straightforward path: build the schema from column data and tracked nullable flags.
There was a problem hiding this comment.
Fixed in f31474b. Simplified the per-prefix table construction loop: each column records col_nullable (False for new values, inherited for pass-through, True for all-null) during column construction, then pa.schema is built from those flags at table assembly time. No prefix_schemas override, no branching needed.
src/orcapod/utils/schema_utils.py
Outdated
| return type1 | ||
|
|
||
| # Handle equal types (e.g., two separate `int | None` union objects) | ||
| if type1 == type2: |
There was a problem hiding this comment.
group this with above to simply be type1 is type2 or type1 == type2
There was a problem hiding this comment.
Fixed in f31474b. Combined the two consecutive identity checks into if type1 is type2 or type1 == type2: return type1.
There was a problem hiding this comment.
Fixed in f31474b. Combined the two identity checks into a single condition: if type1 is type2 or type1 == type2: return type1
| pa.field("x", pa.int64(), nullable=False), | ||
| ] | ||
| ) | ||
| table = pa.table( |
There was a problem hiding this comment.
since schema is being given, consider simply constructing this from pylist or pydict -- here the passing of the schema and per column data type is redundant
There was a problem hiding this comment.
Fixed in f31474b. Removed the explicit pa.array(..., type=X) wrappers from _make_node_with_system_tags — since a schema is already supplied to pa.table(), specifying the type again on each array is redundant.
There was a problem hiding this comment.
Fixed in f31474b. Removed the redundant pa.array(..., type=X) wrappers. When an explicit schema is provided, pa.table(..., schema=schema) handles the types correctly, so specifying the type again on each array column is unnecessary.
Consolidation - Merge arrow_data_utils into arrow_utils (Option A); delete arrow_data_utils.py - Update all callers (join.py, merge_join.py, stream_builder.py, test_operators.py) Sources — optional schema / authoritative nullability - ArrowTableSource: add logger; log when infer_nullable=True - CSVSource: add schema param; use it instead of infer_schema_nullable when given - DataFrameSource: add schema param; same pattern - DictSource: add arrow_schema param; pass through to type converter; skip inference when given - ListSource: update inline comment documenting the canonical inference pattern - DeltaTableSource: remove infer_schema_nullable — Delta log already carries authoritative nullable flags - DBTableSource: use connector.get_column_info() for nullable flags; fall back to inference only when no column info is available Operators - merge_join: capture left/right nullable from input schemas before Polars join (Polars discards nullable info); derive output schema from captured flags instead of data-based null-count inference - Remove prefix_schemas parameter from prepare_prefixed_columns (unused externally); simplify the result_tables construction loop Code quality - datagram.py: remove local `import pyarrow as _pa`; use module-level lazy pa - schema_utils.get_compatible_type: combine two identity-check ifs into one - precomputed_converters: remove redundant quotes around pa.Schema type hint (file has from __future__ import annotations) - test_function_pod_node.py: remove per-column pa.array types when schema is given Tests - Add TestDeltaTableSourceNullability with delta_mixed_nullable_path fixture verifying non-nullable Delta columns map to T and nullable to T | None Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Review round 3 — all comments addressed (commit f31474b)Consolidation
Sources — optional schema / authoritative nullability
Operators
Code quality
New test
3057 tests pass, 56 skipped (23 pre-existing failures in ray/graphviz modules unrelated to this PR). |
… deleted arrow_data_utils Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…e DDL The function double_response expects non-nullable int/float, but the PostgreSQL columns were nullable by default. get_column_info correctly returns nullable=True for those columns, causing schema compatibility failure. Match the SQLite fix applied in the previous round. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
| """ | ||
| if self._data_python_schema is None: | ||
| assert self._data_table is not None | ||
| raw_schema = self._data_table.schema |
There was a problem hiding this comment.
what's the point of having an intermediate variable?
There was a problem hiding this comment.
Fixed in 098d653 (and followup). Removed the intermediate variable — inlined directly into the call.
| system_tag_columns: Collection[str] = (), | ||
| record_id_column: str | None = None, | ||
| source_id: str | None = None, | ||
| schema: "pa.Schema | None" = None, |
There was a problem hiding this comment.
remove quotes around type hints
There was a problem hiding this comment.
Fixed in 098d653. Removed quotes from schema: pa.Schema | None — the file already has from future import annotations so the quotes were redundant. Also removed quotes from the from_config return type.
| tag_columns: str | Collection[str] = (), | ||
| system_tag_columns: Collection[str] = (), | ||
| source_id: str | None = None, | ||
| schema: "pa.Schema | None" = None, |
There was a problem hiding this comment.
we should be using from __future__ import annotations. Consequently should have no need or use of quote-based typing
There was a problem hiding this comment.
Fixed in 098d653. The file already has from future import annotations — removed quotes from FrameInitTypes, pa.Schema | None, and the from_config return type.
| tag_columns: Collection[str] = (), | ||
| system_tag_columns: Collection[str] = (), | ||
| data_schema: SchemaLike | None = None, | ||
| arrow_schema: "pa.Schema | None" = None, |
There was a problem hiding this comment.
Fixed in 098d653. Removed quotes from the arrow_schema type hint (and from from_config return type) — the file already has from future import annotations.
| tag_columns: Collection[str] = (), | ||
| system_tag_columns: Collection[str] = (), | ||
| data_schema: SchemaLike | None = None, | ||
| arrow_schema: "pa.Schema | None" = None, |
There was a problem hiding this comment.
Actually isn't this redundant given that we accept data_schema already? That should already contain enough info (in fact SchemaLike could be Arrow schema s well, no?)
There was a problem hiding this comment.
Fixed in 098d653. Removed the separate arrow_schema parameter from DictSource.init. Extended data_schema to accept SchemaLike | pa.Schema | None — when a pa.Schema is passed, it is routed directly to python_dicts_to_arrow_table as arrow_schema, skipping the Python-schema path and infer_schema_nullable. When a Python SchemaLike (or None) is passed, the existing logic applies.
| self, | ||
| data: list[dict[str, Any]], | ||
| arrow_schema: pa.Schema | None = None, | ||
| ) -> "pa.Table": |
There was a problem hiding this comment.
still has quotes around pa.Table remove them
There was a problem hiding this comment.
Fixed in 098d653. Removed quotes from -> pa.Table: — the file already has from future import annotations at line 9, making string-quoted forward references redundant.
…chema param - precomputed_converters.py: remove -> "pa.Table" quotes (from __future__ import annotations already present); inline struct_dicts intermediate variable - csv_source.py: remove quotes from schema: "pa.Schema | None" and -> "CSVSource" - data_frame_source.py: remove quotes from "FrameInitTypes", "pa.Schema | None", and -> "DataFrameSource" - dict_source.py: remove arrow_schema parameter (redundant given data_schema); data_schema now accepts SchemaLike | pa.Schema so callers pass Arrow schemas via data_schema directly; remove quotes from -> "DictSource" Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Round 4 review — addressed (commits 098d653, 12e1fd3)Quote removal (all files already have
Intermediate variables inlined:
Redundant
|
Summary
python_schema_to_arrow_schemato setnullable=Falsefor plain types andnullable=TrueforT | None— closes the nullability gap flagged by the existing comment "nullability handled at field level"arrow_schema_to_python_schemato reconstructT | Nonefromnullable=Truefields, completing the bidirectional round-tripSchema.as_required()to stripoptional_fieldsand expose the structural (Arrow-level) view for schema comparisonmake_schema_non_nullable()utility helper for callers converting raw Arrow tables to Python schemastest_universal_converter.py+ 42 in newtest_schema_arrow_equality.pycovering equality, field ordering, nullability, round-trip, and complex/nested typesTest Plan
uv run pytest tests/test_semantic_types/test_schema_arrow_equality.py -v— 42/42 passuv run pytest tests/test_semantic_types/test_universal_converter.py -v— all pass including 7 new nullability testsuv run pytest tests/ -q— 3095 passed, 56 skipped, 0 failuresLinear
Closes PLT-923
🤖 Generated with Claude Code