diff --git a/integrations/python/dataloader/src/openhouse/dataloader/datafusion_sql.py b/integrations/python/dataloader/src/openhouse/dataloader/datafusion_sql.py index 63e482efe..ca11fb673 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/datafusion_sql.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/datafusion_sql.py @@ -14,7 +14,7 @@ class DataFusion(Dialect): DIALECT = "datafusion" NORMALIZE_FUNCTIONS: bool | str = "lower" - NORMALIZATION_STRATEGY = NormalizationStrategy.LOWERCASE + NORMALIZATION_STRATEGY = NormalizationStrategy.CASE_SENSITIVE NULL_ORDERING = "nulls_are_last" INDEX_OFFSET = 0 TYPED_DIVISION = True diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index cb56147a0..5f12c00d0 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -512,8 +512,8 @@ def test_branch_snapshot_id_resolves(): catalog = MagicMock() mock_snapshot = MagicMock() mock_snapshot.snapshot_id = 123 - catalog.load_table.return_value.snapshot_by_name.side_effect = ( - lambda name: mock_snapshot if name == "my-branch" else None + catalog.load_table.return_value.snapshot_by_name.side_effect = lambda name: ( + mock_snapshot if name == "my-branch" else None ) loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="my-branch") @@ -700,6 +700,82 @@ def transform(self, table, context): return f"SELECT id, name, value FROM {to_sql_identifier(table)}" +MIXED_CASE_SCHEMA = Schema( + NestedField(field_id=1, name="purchaseAmount", field_type=DoubleType(), required=False), + NestedField(field_id=2, name="itemCount", field_type=LongType(), required=False), + NestedField(field_id=3, name="discountRate", field_type=DoubleType(), required=False), +) + +MIXED_CASE_DATA = { + "purchaseAmount": [19.99, 49.95, 9.99], + "itemCount": [2, 5, 1], + "discountRate": [0.1, 0.2, 0.0], +} + + +class _MixedCaseTransformer(TableTransformer): + """Transformer that selects mixed-case columns.""" + + def __init__(self): + super().__init__(dialect="datafusion") + + def transform(self, table, context): + return f'SELECT "purchaseAmount", "itemCount", "discountRate" FROM {to_sql_identifier(table)}' + + +def test_iter_with_transformer_preserves_mixed_case_columns(tmp_path): + """Transformer with mixed-case columns preserves original casing in Iceberg scan. + + camelCase columns like purchaseAmount, itemCount, discountRate would lose + their casing under a lowercasing dialect, breaking Iceberg field lookups. + """ + catalog = _make_real_catalog(tmp_path, data=MIXED_CASE_DATA, iceberg_schema=MIXED_CASE_SCHEMA) + mock_table = catalog.load_table.return_value + + loader = OpenHouseDataLoader( + catalog=catalog, + database="db", + table="tbl", + columns=["purchaseAmount", "itemCount"], + context=DataLoaderContext(table_transformer=_MixedCaseTransformer()), + ) + result = _materialize(loader) + + assert result.num_rows == 3 + # Verify scan received original camelCase names, not lowercased + scan_kwargs = mock_table.scan.call_args.kwargs + selected = scan_kwargs["selected_fields"] + assert "purchaseAmount" in selected + assert "itemCount" in selected + assert "purchaseamount" not in selected + assert "itemcount" not in selected + + +def test_iter_with_transformer_preserves_mixed_case_filter_columns(tmp_path): + """Pushed-down filter column names preserve original mixed-case casing. + + Filtering on itemCount must not cause it to appear as 'itemcount' in the scan. + """ + catalog = _make_real_catalog(tmp_path, data=MIXED_CASE_DATA, iceberg_schema=MIXED_CASE_SCHEMA) + mock_table = catalog.load_table.return_value + + loader = OpenHouseDataLoader( + catalog=catalog, + database="db", + table="tbl", + filters=col("itemCount") > 3, + context=DataLoaderContext(table_transformer=_MixedCaseTransformer()), + ) + _materialize(loader) + + scan_kwargs = mock_table.scan.call_args.kwargs + selected = scan_kwargs["selected_fields"] + # All projected columns must use original camelCase from the Iceberg schema + allowed = {"purchaseAmount", "itemCount", "discountRate"} + for field in selected: + assert field in allowed, f"Unexpected field '{field}' — likely lowercased" + + @pytest.mark.parametrize( "filter_expr, expected_names", [