From 940747a7f25d3b09e6a0551711762d9f652e48c7 Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Wed, 8 Apr 2026 20:18:16 +0000 Subject: [PATCH 1/4] fix: preserve column casing in DataFusion SQL dialect to fix camelCase column lookups The DataFusion dialect's NORMALIZATION_STRATEGY was set to LOWERCASE, causing sqlglot to lowercase all identifiers during SQL optimization. This broke tables with camelCase columns (e.g. viewerId, feedPosition) because both DataFusion execution and PyIceberg scans are case-sensitive. Change the strategy to CASE_SENSITIVE, which matches DataFusion's actual behavior and preserves original identifier casing throughout the pipeline. Co-Authored-By: Claude Opus 4.6 --- .../openhouse/dataloader/datafusion_sql.py | 2 +- .../dataloader/tests/test_data_loader.py | 72 ++++++++++++++++++- 2 files changed, 71 insertions(+), 3 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/datafusion_sql.py b/integrations/python/dataloader/src/openhouse/dataloader/datafusion_sql.py index 63e482efe..ca11fb673 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/datafusion_sql.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/datafusion_sql.py @@ -14,7 +14,7 @@ class DataFusion(Dialect): DIALECT = "datafusion" NORMALIZE_FUNCTIONS: bool | str = "lower" - NORMALIZATION_STRATEGY = NormalizationStrategy.LOWERCASE + NORMALIZATION_STRATEGY = NormalizationStrategy.CASE_SENSITIVE NULL_ORDERING = "nulls_are_last" INDEX_OFFSET = 0 TYPED_DIVISION = True diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index cb56147a0..4101dad47 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -512,8 +512,8 @@ def test_branch_snapshot_id_resolves(): catalog = MagicMock() mock_snapshot = MagicMock() mock_snapshot.snapshot_id = 123 - catalog.load_table.return_value.snapshot_by_name.side_effect = ( - lambda name: mock_snapshot if name == "my-branch" else None + catalog.load_table.return_value.snapshot_by_name.side_effect = lambda name: ( + mock_snapshot if name == "my-branch" else None ) loader = OpenHouseDataLoader(catalog=catalog, database="db", table="tbl", branch="my-branch") @@ -700,6 +700,74 @@ def transform(self, table, context): return f"SELECT id, name, value FROM {to_sql_identifier(table)}" +MIXED_CASE_SCHEMA = Schema( + NestedField(field_id=1, name="accountId", field_type=LongType(), required=False), + NestedField(field_id=2, name="pageViewCount", field_type=LongType(), required=False), + NestedField(field_id=3, name="avgSessionLength", field_type=DoubleType(), required=False), +) + +MIXED_CASE_DATA = { + "accountId": [1, 2, 3], + "pageViewCount": [10, 20, 30], + "avgSessionLength": [1.5, 2.5, 3.5], +} + + +class _MixedCaseTransformer(TableTransformer): + """Transformer that selects mixed-case columns.""" + + def __init__(self): + super().__init__(dialect="datafusion") + + def transform(self, table, context): + return f'SELECT "accountId", "pageViewCount", "avgSessionLength" FROM {to_sql_identifier(table)}' + + +def test_iter_with_transformer_preserves_mixed_case_columns(tmp_path): + """Transformer with mixed-case columns preserves original casing in Iceberg scan.""" + catalog = _make_real_catalog(tmp_path, data=MIXED_CASE_DATA, iceberg_schema=MIXED_CASE_SCHEMA) + mock_table = catalog.load_table.return_value + + loader = OpenHouseDataLoader( + catalog=catalog, + database="db", + table="tbl", + columns=["accountId", "pageViewCount"], + context=DataLoaderContext(table_transformer=_MixedCaseTransformer()), + ) + result = _materialize(loader) + + assert result.num_rows == 3 + # Verify scan received original field names, not lowercased + scan_kwargs = mock_table.scan.call_args.kwargs + selected = scan_kwargs["selected_fields"] + assert "accountId" in selected + assert "pageViewCount" in selected + assert "accountid" not in selected + assert "pageviewcount" not in selected + + +def test_iter_with_transformer_preserves_mixed_case_filter_columns(tmp_path): + """Pushed-down filter column names preserve original mixed-case casing.""" + catalog = _make_real_catalog(tmp_path, data=MIXED_CASE_DATA, iceberg_schema=MIXED_CASE_SCHEMA) + mock_table = catalog.load_table.return_value + + loader = OpenHouseDataLoader( + catalog=catalog, + database="db", + table="tbl", + filters=col("pageViewCount") > 15, + context=DataLoaderContext(table_transformer=_MixedCaseTransformer()), + ) + _materialize(loader) + + scan_kwargs = mock_table.scan.call_args.kwargs + selected = scan_kwargs["selected_fields"] + # All projected columns must use original casing from Iceberg schema + for field in selected: + assert field in ("accountId", "pageViewCount", "avgSessionLength"), f"Unexpected field: {field}" + + @pytest.mark.parametrize( "filter_expr, expected_names", [ From 283f8cba2153d6f233fedc793dbb50c9a20e9b76 Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Wed, 8 Apr 2026 20:25:54 +0000 Subject: [PATCH 2/4] test: use colliding column names (userId/USERID/UserID) in mixed-case tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Makes the test data truly ambiguous — all three columns lowercase to "userid", so a lowercasing dialect would collapse them into one column. Co-Authored-By: Claude Opus 4.6 --- .../dataloader/tests/test_data_loader.py | 44 +++++++++++-------- 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index 4101dad47..3c1c9696d 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -701,15 +701,15 @@ def transform(self, table, context): MIXED_CASE_SCHEMA = Schema( - NestedField(field_id=1, name="accountId", field_type=LongType(), required=False), - NestedField(field_id=2, name="pageViewCount", field_type=LongType(), required=False), - NestedField(field_id=3, name="avgSessionLength", field_type=DoubleType(), required=False), + NestedField(field_id=1, name="userId", field_type=LongType(), required=False), + NestedField(field_id=2, name="USERID", field_type=LongType(), required=False), + NestedField(field_id=3, name="UserID", field_type=DoubleType(), required=False), ) MIXED_CASE_DATA = { - "accountId": [1, 2, 3], - "pageViewCount": [10, 20, 30], - "avgSessionLength": [1.5, 2.5, 3.5], + "userId": [1, 2, 3], + "USERID": [100, 200, 300], + "UserID": [1.5, 2.5, 3.5], } @@ -720,11 +720,15 @@ def __init__(self): super().__init__(dialect="datafusion") def transform(self, table, context): - return f'SELECT "accountId", "pageViewCount", "avgSessionLength" FROM {to_sql_identifier(table)}' + return f'SELECT "userId", "USERID", "UserID" FROM {to_sql_identifier(table)}' def test_iter_with_transformer_preserves_mixed_case_columns(tmp_path): - """Transformer with mixed-case columns preserves original casing in Iceberg scan.""" + """Transformer with mixed-case columns preserves original casing in Iceberg scan. + + All three columns (userId, USERID, UserID) lowercase to the same string, + so a lowercasing dialect would collapse them into a single column. + """ catalog = _make_real_catalog(tmp_path, data=MIXED_CASE_DATA, iceberg_schema=MIXED_CASE_SCHEMA) mock_table = catalog.load_table.return_value @@ -732,23 +736,26 @@ def test_iter_with_transformer_preserves_mixed_case_columns(tmp_path): catalog=catalog, database="db", table="tbl", - columns=["accountId", "pageViewCount"], + columns=["userId", "USERID"], context=DataLoaderContext(table_transformer=_MixedCaseTransformer()), ) result = _materialize(loader) assert result.num_rows == 3 - # Verify scan received original field names, not lowercased + # Verify scan received exact field names, not lowercased scan_kwargs = mock_table.scan.call_args.kwargs selected = scan_kwargs["selected_fields"] - assert "accountId" in selected - assert "pageViewCount" in selected - assert "accountid" not in selected - assert "pageviewcount" not in selected + assert "userId" in selected + assert "USERID" in selected + # A lowercasing dialect would produce "userid" for both — must not happen + assert selected.count("userid") == 0 def test_iter_with_transformer_preserves_mixed_case_filter_columns(tmp_path): - """Pushed-down filter column names preserve original mixed-case casing.""" + """Pushed-down filter column names preserve original mixed-case casing. + + Filtering on USERID must not cause it to appear as 'userid' in the scan. + """ catalog = _make_real_catalog(tmp_path, data=MIXED_CASE_DATA, iceberg_schema=MIXED_CASE_SCHEMA) mock_table = catalog.load_table.return_value @@ -756,16 +763,17 @@ def test_iter_with_transformer_preserves_mixed_case_filter_columns(tmp_path): catalog=catalog, database="db", table="tbl", - filters=col("pageViewCount") > 15, + filters=col("USERID") > 150, context=DataLoaderContext(table_transformer=_MixedCaseTransformer()), ) _materialize(loader) scan_kwargs = mock_table.scan.call_args.kwargs selected = scan_kwargs["selected_fields"] - # All projected columns must use original casing from Iceberg schema + # All projected columns must use exact casing from the Iceberg schema + allowed = {"userId", "USERID", "UserID"} for field in selected: - assert field in ("accountId", "pageViewCount", "avgSessionLength"), f"Unexpected field: {field}" + assert field in allowed, f"Unexpected field '{field}' — likely lowercased" @pytest.mark.parametrize( From 03e095d5dcd1ad81ea2e88578a6d215fa6c15e0c Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Wed, 8 Apr 2026 20:29:37 +0000 Subject: [PATCH 3/4] test: use descriptive column names (purchaseAmount variants) in mixed-case tests Renames generic userId/USERID/UserID to purchaseAmount/PURCHASEAMOUNT/ PurchaseAmount for better readability while preserving the case-collision property that makes the tests meaningful. Co-Authored-By: Claude Opus 4.6 --- .../dataloader/tests/test_data_loader.py | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index 3c1c9696d..9ccdd88d9 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -701,15 +701,15 @@ def transform(self, table, context): MIXED_CASE_SCHEMA = Schema( - NestedField(field_id=1, name="userId", field_type=LongType(), required=False), - NestedField(field_id=2, name="USERID", field_type=LongType(), required=False), - NestedField(field_id=3, name="UserID", field_type=DoubleType(), required=False), + NestedField(field_id=1, name="purchaseAmount", field_type=DoubleType(), required=False), + NestedField(field_id=2, name="PURCHASEAMOUNT", field_type=LongType(), required=False), + NestedField(field_id=3, name="PurchaseAmount", field_type=DoubleType(), required=False), ) MIXED_CASE_DATA = { - "userId": [1, 2, 3], - "USERID": [100, 200, 300], - "UserID": [1.5, 2.5, 3.5], + "purchaseAmount": [19.99, 49.95, 9.99], + "PURCHASEAMOUNT": [100, 200, 300], + "PurchaseAmount": [29.99, 59.95, 14.99], } @@ -720,14 +720,14 @@ def __init__(self): super().__init__(dialect="datafusion") def transform(self, table, context): - return f'SELECT "userId", "USERID", "UserID" FROM {to_sql_identifier(table)}' + return f'SELECT "purchaseAmount", "PURCHASEAMOUNT", "PurchaseAmount" FROM {to_sql_identifier(table)}' def test_iter_with_transformer_preserves_mixed_case_columns(tmp_path): """Transformer with mixed-case columns preserves original casing in Iceberg scan. - All three columns (userId, USERID, UserID) lowercase to the same string, - so a lowercasing dialect would collapse them into a single column. + All three columns (purchaseAmount, PURCHASEAMOUNT, PurchaseAmount) lowercase + to the same string, so a lowercasing dialect would collapse them. """ catalog = _make_real_catalog(tmp_path, data=MIXED_CASE_DATA, iceberg_schema=MIXED_CASE_SCHEMA) mock_table = catalog.load_table.return_value @@ -736,7 +736,7 @@ def test_iter_with_transformer_preserves_mixed_case_columns(tmp_path): catalog=catalog, database="db", table="tbl", - columns=["userId", "USERID"], + columns=["purchaseAmount", "PURCHASEAMOUNT"], context=DataLoaderContext(table_transformer=_MixedCaseTransformer()), ) result = _materialize(loader) @@ -745,16 +745,16 @@ def test_iter_with_transformer_preserves_mixed_case_columns(tmp_path): # Verify scan received exact field names, not lowercased scan_kwargs = mock_table.scan.call_args.kwargs selected = scan_kwargs["selected_fields"] - assert "userId" in selected - assert "USERID" in selected - # A lowercasing dialect would produce "userid" for both — must not happen - assert selected.count("userid") == 0 + assert "purchaseAmount" in selected + assert "PURCHASEAMOUNT" in selected + # A lowercasing dialect would produce "purchaseamount" for both — must not happen + assert selected.count("purchaseamount") == 0 def test_iter_with_transformer_preserves_mixed_case_filter_columns(tmp_path): """Pushed-down filter column names preserve original mixed-case casing. - Filtering on USERID must not cause it to appear as 'userid' in the scan. + Filtering on PURCHASEAMOUNT must not cause it to appear as 'purchaseamount'. """ catalog = _make_real_catalog(tmp_path, data=MIXED_CASE_DATA, iceberg_schema=MIXED_CASE_SCHEMA) mock_table = catalog.load_table.return_value @@ -763,7 +763,7 @@ def test_iter_with_transformer_preserves_mixed_case_filter_columns(tmp_path): catalog=catalog, database="db", table="tbl", - filters=col("USERID") > 150, + filters=col("PURCHASEAMOUNT") > 150, context=DataLoaderContext(table_transformer=_MixedCaseTransformer()), ) _materialize(loader) @@ -771,7 +771,7 @@ def test_iter_with_transformer_preserves_mixed_case_filter_columns(tmp_path): scan_kwargs = mock_table.scan.call_args.kwargs selected = scan_kwargs["selected_fields"] # All projected columns must use exact casing from the Iceberg schema - allowed = {"userId", "USERID", "UserID"} + allowed = {"purchaseAmount", "PURCHASEAMOUNT", "PurchaseAmount"} for field in selected: assert field in allowed, f"Unexpected field '{field}' — likely lowercased" From ff6b3b51769bd433cf23ad20df6ee8a12c6db3c1 Mon Sep 17 00:00:00 2001 From: ShreyeshArangath Date: Wed, 8 Apr 2026 20:31:48 +0000 Subject: [PATCH 4/4] test: use distinct sales-themed camelCase columns in mixed-case tests Replaces colliding casing variants (purchaseAmount/PURCHASEAMOUNT/ PurchaseAmount) with distinct descriptive columns (purchaseAmount, itemCount, discountRate) that better represent a real schema. Co-Authored-By: Claude Opus 4.6 --- .../dataloader/tests/test_data_loader.py | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index 9ccdd88d9..5f12c00d0 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -702,14 +702,14 @@ def transform(self, table, context): MIXED_CASE_SCHEMA = Schema( NestedField(field_id=1, name="purchaseAmount", field_type=DoubleType(), required=False), - NestedField(field_id=2, name="PURCHASEAMOUNT", field_type=LongType(), required=False), - NestedField(field_id=3, name="PurchaseAmount", field_type=DoubleType(), required=False), + NestedField(field_id=2, name="itemCount", field_type=LongType(), required=False), + NestedField(field_id=3, name="discountRate", field_type=DoubleType(), required=False), ) MIXED_CASE_DATA = { "purchaseAmount": [19.99, 49.95, 9.99], - "PURCHASEAMOUNT": [100, 200, 300], - "PurchaseAmount": [29.99, 59.95, 14.99], + "itemCount": [2, 5, 1], + "discountRate": [0.1, 0.2, 0.0], } @@ -720,14 +720,14 @@ def __init__(self): super().__init__(dialect="datafusion") def transform(self, table, context): - return f'SELECT "purchaseAmount", "PURCHASEAMOUNT", "PurchaseAmount" FROM {to_sql_identifier(table)}' + return f'SELECT "purchaseAmount", "itemCount", "discountRate" FROM {to_sql_identifier(table)}' def test_iter_with_transformer_preserves_mixed_case_columns(tmp_path): """Transformer with mixed-case columns preserves original casing in Iceberg scan. - All three columns (purchaseAmount, PURCHASEAMOUNT, PurchaseAmount) lowercase - to the same string, so a lowercasing dialect would collapse them. + camelCase columns like purchaseAmount, itemCount, discountRate would lose + their casing under a lowercasing dialect, breaking Iceberg field lookups. """ catalog = _make_real_catalog(tmp_path, data=MIXED_CASE_DATA, iceberg_schema=MIXED_CASE_SCHEMA) mock_table = catalog.load_table.return_value @@ -736,25 +736,25 @@ def test_iter_with_transformer_preserves_mixed_case_columns(tmp_path): catalog=catalog, database="db", table="tbl", - columns=["purchaseAmount", "PURCHASEAMOUNT"], + columns=["purchaseAmount", "itemCount"], context=DataLoaderContext(table_transformer=_MixedCaseTransformer()), ) result = _materialize(loader) assert result.num_rows == 3 - # Verify scan received exact field names, not lowercased + # Verify scan received original camelCase names, not lowercased scan_kwargs = mock_table.scan.call_args.kwargs selected = scan_kwargs["selected_fields"] assert "purchaseAmount" in selected - assert "PURCHASEAMOUNT" in selected - # A lowercasing dialect would produce "purchaseamount" for both — must not happen - assert selected.count("purchaseamount") == 0 + assert "itemCount" in selected + assert "purchaseamount" not in selected + assert "itemcount" not in selected def test_iter_with_transformer_preserves_mixed_case_filter_columns(tmp_path): """Pushed-down filter column names preserve original mixed-case casing. - Filtering on PURCHASEAMOUNT must not cause it to appear as 'purchaseamount'. + Filtering on itemCount must not cause it to appear as 'itemcount' in the scan. """ catalog = _make_real_catalog(tmp_path, data=MIXED_CASE_DATA, iceberg_schema=MIXED_CASE_SCHEMA) mock_table = catalog.load_table.return_value @@ -763,15 +763,15 @@ def test_iter_with_transformer_preserves_mixed_case_filter_columns(tmp_path): catalog=catalog, database="db", table="tbl", - filters=col("PURCHASEAMOUNT") > 150, + filters=col("itemCount") > 3, context=DataLoaderContext(table_transformer=_MixedCaseTransformer()), ) _materialize(loader) scan_kwargs = mock_table.scan.call_args.kwargs selected = scan_kwargs["selected_fields"] - # All projected columns must use exact casing from the Iceberg schema - allowed = {"purchaseAmount", "PURCHASEAMOUNT", "PurchaseAmount"} + # All projected columns must use original camelCase from the Iceberg schema + allowed = {"purchaseAmount", "itemCount", "discountRate"} for field in selected: assert field in allowed, f"Unexpected field '{field}' — likely lowercased"