From c9dd6231aa3bd75637dc250fae600048082d202c Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Fri, 3 Apr 2026 23:05:01 +0000 Subject: [PATCH 01/17] Add jvm_args support to DataLoaderContext Allow users to pass JVM arguments (e.g. -Xmx512m, -verbose:gc, -D flags) to the JNI JVM used by the HDFS client. Adds planner_jvm_args and worker_jvm_args to DataLoaderContext so planner and worker processes can use different JVM configurations. --- .../src/openhouse/dataloader/_jvm.py | 15 ++++++++++++ .../dataloader/_table_scan_context.py | 12 +++++++++- .../src/openhouse/dataloader/data_loader.py | 13 ++++++++++ .../openhouse/dataloader/data_loader_split.py | 3 +++ .../dataloader/tests/test_data_loader.py | 18 ++++++++++++++ .../tests/test_data_loader_split.py | 24 +++++++++++++++++++ .../python/dataloader/tests/test_jvm.py | 24 +++++++++++++++++++ 7 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 integrations/python/dataloader/src/openhouse/dataloader/_jvm.py create mode 100644 integrations/python/dataloader/tests/test_jvm.py diff --git a/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py b/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py new file mode 100644 index 000000000..06d68e1ae --- /dev/null +++ b/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py @@ -0,0 +1,15 @@ +"""JVM configuration utilities for the HDFS client.""" + +import os + + +def apply_libhdfs_opts(jvm_args: str) -> None: + """Merge *jvm_args* into the JNI JVM options environment variable. + + Appends to any existing value. Must be called before the first + HDFS access in the current process (the JVM is started once and + reads these options only at startup). + """ + existing = os.environ.get('LIBHDFS_OPTS', '') + merged = f'{existing} {jvm_args}'.strip() if existing else jvm_args + os.environ['LIBHDFS_OPTS'] = merged diff --git a/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py b/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py index e9e7e5f5e..b8a0dab25 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py @@ -16,6 +16,7 @@ def _unpickle_scan_context( projected_schema: Schema, row_filter: BooleanExpression, table_id: TableIdentifier, + worker_jvm_args: str | None = None, ) -> TableScanContext: return TableScanContext( table_metadata=table_metadata, @@ -23,6 +24,7 @@ def _unpickle_scan_context( projected_schema=projected_schema, row_filter=row_filter, table_id=table_id, + worker_jvm_args=worker_jvm_args, ) @@ -46,9 +48,17 @@ class TableScanContext: projected_schema: Schema table_id: TableIdentifier row_filter: BooleanExpression = AlwaysTrue() + worker_jvm_args: str | None = None def __reduce__(self) -> tuple: return ( _unpickle_scan_context, - (self.table_metadata, dict(self.io.properties), self.projected_schema, self.row_filter, self.table_id), + ( + self.table_metadata, + dict(self.io.properties), + self.projected_schema, + self.row_filter, + self.table_id, + self.worker_jvm_args, + ), ) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index 3c43da7b0..fb35ff7c4 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -10,6 +10,7 @@ from requests import HTTPError from tenacity import Retrying, retry_if_exception, stop_after_attempt, wait_exponential +from openhouse.dataloader._jvm import apply_libhdfs_opts from openhouse.dataloader._table_scan_context import TableScanContext from openhouse.dataloader._timer import log_duration from openhouse.dataloader.data_loader_split import DataLoaderSplit @@ -66,11 +67,19 @@ class DataLoaderContext: execution_context: Dictionary of execution context information (e.g. tenant, environment) table_transformer: Transformation to apply to the table before loading (e.g. column masking) udf_registry: UDFs required for the table transformation + planner_jvm_args: JVM arguments used when starting the JNI JVM in the + process that loads table metadata and plans splits. + worker_jvm_args: JVM arguments used when starting the JNI JVM in worker + processes that read split data. If splits are processed in the same + process as the planner then only ``planner_jvm_args`` takes effect + because the JVM is already running by the time the split is materialized. """ execution_context: Mapping[str, str] | None = None table_transformer: TableTransformer | None = None udf_registry: UDFRegistry | None = None + planner_jvm_args: str | None = None + worker_jvm_args: str | None = None class OpenHouseDataLoader: @@ -112,6 +121,9 @@ def __init__( self._context = context or DataLoaderContext() self._max_attempts = max_attempts + if self._context.planner_jvm_args is not None: + apply_libhdfs_opts(self._context.planner_jvm_args) + @cached_property def _iceberg_table(self) -> Table: return _retry( @@ -215,6 +227,7 @@ def __iter__(self) -> Iterator[DataLoaderSplit]: projected_schema=scan.projection(), row_filter=row_filter, table_id=self._table_id, + worker_jvm_args=self._context.worker_jvm_args, ) # plan_files() materializes all tasks at once (PyIceberg doesn't support streaming) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py index cd3630df3..5dd5a4515 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py @@ -9,6 +9,7 @@ from pyiceberg.io.pyarrow import ArrowScan from pyiceberg.table import FileScanTask +from openhouse.dataloader._jvm import apply_libhdfs_opts from openhouse.dataloader._table_scan_context import TableScanContext from openhouse.dataloader.filters import _quote_identifier from openhouse.dataloader.table_identifier import TableIdentifier @@ -78,6 +79,8 @@ def __iter__(self) -> Iterator[RecordBatch]: delete files, and partition spec lookups. """ ctx = self._scan_context + if ctx.worker_jvm_args is not None: + apply_libhdfs_opts(ctx.worker_jvm_args) arrow_scan = ArrowScan( table_metadata=ctx.table_metadata, io=ctx.io, diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index cb56147a0..f56f01bc8 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -745,3 +745,21 @@ def test_starts_with_wildcard_literals(tmp_path, filter_expr, expected_names): ) result = _materialize(loader) assert sorted(result.column(COL_NAME).to_pylist()) == sorted(expected_names) + + +# --- JVM args tests --- + + +def test_planner_jvm_args_sets_libhdfs_opts(tmp_path, monkeypatch): + """planner_jvm_args is applied to LIBHDFS_OPTS during __init__.""" + monkeypatch.delenv('LIBHDFS_OPTS', raising=False) + catalog = _make_real_catalog(tmp_path) + + OpenHouseDataLoader( + catalog=catalog, + database="db", + table="tbl", + context=DataLoaderContext(planner_jvm_args="-Xmx256m"), + ) + + assert os.environ['LIBHDFS_OPTS'] == '-Xmx256m' diff --git a/integrations/python/dataloader/tests/test_data_loader_split.py b/integrations/python/dataloader/tests/test_data_loader_split.py index dd382b7cb..d0f311202 100644 --- a/integrations/python/dataloader/tests/test_data_loader_split.py +++ b/integrations/python/dataloader/tests/test_data_loader_split.py @@ -396,3 +396,27 @@ def test_transform_with_quoted_identifier(tmp_path): assert result.num_rows == 1 assert result.column("name").to_pylist() == ["MASKED"] + + +# --- JVM args tests --- + + +def test_worker_jvm_args_sets_libhdfs_opts(tmp_path, monkeypatch): + """worker_jvm_args is applied to LIBHDFS_OPTS when iterating a split.""" + monkeypatch.delenv('LIBHDFS_OPTS', raising=False) + + table = pa.table({"x": [1]}) + schema = Schema(NestedField(field_id=1, name="x", field_type=LongType(), required=False)) + + split = _create_test_split(tmp_path, table, FileFormat.PARQUET, schema) + split._scan_context = TableScanContext( + table_metadata=split._scan_context.table_metadata, + io=split._scan_context.io, + projected_schema=split._scan_context.projected_schema, + table_id=split._scan_context.table_id, + worker_jvm_args="-Xmx512m", + ) + + list(split) + + assert os.environ['LIBHDFS_OPTS'] == '-Xmx512m' diff --git a/integrations/python/dataloader/tests/test_jvm.py b/integrations/python/dataloader/tests/test_jvm.py new file mode 100644 index 000000000..66ef0407a --- /dev/null +++ b/integrations/python/dataloader/tests/test_jvm.py @@ -0,0 +1,24 @@ +import os + +import pytest + +from openhouse.dataloader._jvm import apply_libhdfs_opts + + +@pytest.fixture(autouse=True) +def _clean_env(monkeypatch: pytest.MonkeyPatch) -> None: + """Remove LIBHDFS_OPTS before each test so tests are isolated.""" + monkeypatch.delenv('LIBHDFS_OPTS', raising=False) + + +def test_sets_env_when_unset() -> None: + apply_libhdfs_opts('-Xmx512m') + assert os.environ['LIBHDFS_OPTS'] == '-Xmx512m' + + +def test_appends_to_existing(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv('LIBHDFS_OPTS', '-Xmx256m') + apply_libhdfs_opts('-verbose:gc') + assert os.environ['LIBHDFS_OPTS'] == '-Xmx256m -verbose:gc' + + From b342b26271f83c59115e0acea0f7bf8dd9608ebd Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Mon, 6 Apr 2026 18:36:40 +0000 Subject: [PATCH 02/17] Fix ruff formatting --- .../dataloader/src/openhouse/dataloader/_jvm.py | 6 +++--- .../python/dataloader/tests/test_data_loader.py | 4 ++-- .../dataloader/tests/test_data_loader_split.py | 4 ++-- integrations/python/dataloader/tests/test_jvm.py | 14 ++++++-------- 4 files changed, 13 insertions(+), 15 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py b/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py index 06d68e1ae..da6527aa7 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py @@ -10,6 +10,6 @@ def apply_libhdfs_opts(jvm_args: str) -> None: HDFS access in the current process (the JVM is started once and reads these options only at startup). """ - existing = os.environ.get('LIBHDFS_OPTS', '') - merged = f'{existing} {jvm_args}'.strip() if existing else jvm_args - os.environ['LIBHDFS_OPTS'] = merged + existing = os.environ.get("LIBHDFS_OPTS", "") + merged = f"{existing} {jvm_args}".strip() if existing else jvm_args + os.environ["LIBHDFS_OPTS"] = merged diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index f56f01bc8..05ed6619f 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -752,7 +752,7 @@ def test_starts_with_wildcard_literals(tmp_path, filter_expr, expected_names): def test_planner_jvm_args_sets_libhdfs_opts(tmp_path, monkeypatch): """planner_jvm_args is applied to LIBHDFS_OPTS during __init__.""" - monkeypatch.delenv('LIBHDFS_OPTS', raising=False) + monkeypatch.delenv("LIBHDFS_OPTS", raising=False) catalog = _make_real_catalog(tmp_path) OpenHouseDataLoader( @@ -762,4 +762,4 @@ def test_planner_jvm_args_sets_libhdfs_opts(tmp_path, monkeypatch): context=DataLoaderContext(planner_jvm_args="-Xmx256m"), ) - assert os.environ['LIBHDFS_OPTS'] == '-Xmx256m' + assert os.environ["LIBHDFS_OPTS"] == "-Xmx256m" diff --git a/integrations/python/dataloader/tests/test_data_loader_split.py b/integrations/python/dataloader/tests/test_data_loader_split.py index d0f311202..76e839649 100644 --- a/integrations/python/dataloader/tests/test_data_loader_split.py +++ b/integrations/python/dataloader/tests/test_data_loader_split.py @@ -403,7 +403,7 @@ def test_transform_with_quoted_identifier(tmp_path): def test_worker_jvm_args_sets_libhdfs_opts(tmp_path, monkeypatch): """worker_jvm_args is applied to LIBHDFS_OPTS when iterating a split.""" - monkeypatch.delenv('LIBHDFS_OPTS', raising=False) + monkeypatch.delenv("LIBHDFS_OPTS", raising=False) table = pa.table({"x": [1]}) schema = Schema(NestedField(field_id=1, name="x", field_type=LongType(), required=False)) @@ -419,4 +419,4 @@ def test_worker_jvm_args_sets_libhdfs_opts(tmp_path, monkeypatch): list(split) - assert os.environ['LIBHDFS_OPTS'] == '-Xmx512m' + assert os.environ["LIBHDFS_OPTS"] == "-Xmx512m" diff --git a/integrations/python/dataloader/tests/test_jvm.py b/integrations/python/dataloader/tests/test_jvm.py index 66ef0407a..ef83bc697 100644 --- a/integrations/python/dataloader/tests/test_jvm.py +++ b/integrations/python/dataloader/tests/test_jvm.py @@ -8,17 +8,15 @@ @pytest.fixture(autouse=True) def _clean_env(monkeypatch: pytest.MonkeyPatch) -> None: """Remove LIBHDFS_OPTS before each test so tests are isolated.""" - monkeypatch.delenv('LIBHDFS_OPTS', raising=False) + monkeypatch.delenv("LIBHDFS_OPTS", raising=False) def test_sets_env_when_unset() -> None: - apply_libhdfs_opts('-Xmx512m') - assert os.environ['LIBHDFS_OPTS'] == '-Xmx512m' + apply_libhdfs_opts("-Xmx512m") + assert os.environ["LIBHDFS_OPTS"] == "-Xmx512m" def test_appends_to_existing(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setenv('LIBHDFS_OPTS', '-Xmx256m') - apply_libhdfs_opts('-verbose:gc') - assert os.environ['LIBHDFS_OPTS'] == '-Xmx256m -verbose:gc' - - + monkeypatch.setenv("LIBHDFS_OPTS", "-Xmx256m") + apply_libhdfs_opts("-verbose:gc") + assert os.environ["LIBHDFS_OPTS"] == "-Xmx256m -verbose:gc" From 8c1ee7e8be373f61b216c9da2c254a19672f2231 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Mon, 6 Apr 2026 19:10:49 +0000 Subject: [PATCH 03/17] Extract LIBHDFS_OPTS env var name to constant --- .../python/dataloader/src/openhouse/dataloader/_jvm.py | 7 +++++-- .../python/dataloader/tests/test_data_loader.py | 6 ++++-- .../python/dataloader/tests/test_data_loader_split.py | 6 ++++-- integrations/python/dataloader/tests/test_jvm.py | 10 +++++----- 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py b/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py index da6527aa7..a993843ba 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py @@ -2,6 +2,9 @@ import os +LIBHDFS_OPTS_ENV = "LIBHDFS_OPTS" +"""Environment variable read by libhdfs when starting the JNI JVM.""" + def apply_libhdfs_opts(jvm_args: str) -> None: """Merge *jvm_args* into the JNI JVM options environment variable. @@ -10,6 +13,6 @@ def apply_libhdfs_opts(jvm_args: str) -> None: HDFS access in the current process (the JVM is started once and reads these options only at startup). """ - existing = os.environ.get("LIBHDFS_OPTS", "") + existing = os.environ.get(LIBHDFS_OPTS_ENV, "") merged = f"{existing} {jvm_args}".strip() if existing else jvm_args - os.environ["LIBHDFS_OPTS"] = merged + os.environ[LIBHDFS_OPTS_ENV] = merged diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index 05ed6619f..2b19dbd3c 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -752,7 +752,9 @@ def test_starts_with_wildcard_literals(tmp_path, filter_expr, expected_names): def test_planner_jvm_args_sets_libhdfs_opts(tmp_path, monkeypatch): """planner_jvm_args is applied to LIBHDFS_OPTS during __init__.""" - monkeypatch.delenv("LIBHDFS_OPTS", raising=False) + from openhouse.dataloader._jvm import LIBHDFS_OPTS_ENV + + monkeypatch.delenv(LIBHDFS_OPTS_ENV, raising=False) catalog = _make_real_catalog(tmp_path) OpenHouseDataLoader( @@ -762,4 +764,4 @@ def test_planner_jvm_args_sets_libhdfs_opts(tmp_path, monkeypatch): context=DataLoaderContext(planner_jvm_args="-Xmx256m"), ) - assert os.environ["LIBHDFS_OPTS"] == "-Xmx256m" + assert os.environ[LIBHDFS_OPTS_ENV] == "-Xmx256m" diff --git a/integrations/python/dataloader/tests/test_data_loader_split.py b/integrations/python/dataloader/tests/test_data_loader_split.py index 76e839649..306eb3f84 100644 --- a/integrations/python/dataloader/tests/test_data_loader_split.py +++ b/integrations/python/dataloader/tests/test_data_loader_split.py @@ -403,7 +403,9 @@ def test_transform_with_quoted_identifier(tmp_path): def test_worker_jvm_args_sets_libhdfs_opts(tmp_path, monkeypatch): """worker_jvm_args is applied to LIBHDFS_OPTS when iterating a split.""" - monkeypatch.delenv("LIBHDFS_OPTS", raising=False) + from openhouse.dataloader._jvm import LIBHDFS_OPTS_ENV + + monkeypatch.delenv(LIBHDFS_OPTS_ENV, raising=False) table = pa.table({"x": [1]}) schema = Schema(NestedField(field_id=1, name="x", field_type=LongType(), required=False)) @@ -419,4 +421,4 @@ def test_worker_jvm_args_sets_libhdfs_opts(tmp_path, monkeypatch): list(split) - assert os.environ["LIBHDFS_OPTS"] == "-Xmx512m" + assert os.environ[LIBHDFS_OPTS_ENV] == "-Xmx512m" diff --git a/integrations/python/dataloader/tests/test_jvm.py b/integrations/python/dataloader/tests/test_jvm.py index ef83bc697..09702dd8f 100644 --- a/integrations/python/dataloader/tests/test_jvm.py +++ b/integrations/python/dataloader/tests/test_jvm.py @@ -2,21 +2,21 @@ import pytest -from openhouse.dataloader._jvm import apply_libhdfs_opts +from openhouse.dataloader._jvm import LIBHDFS_OPTS_ENV, apply_libhdfs_opts @pytest.fixture(autouse=True) def _clean_env(monkeypatch: pytest.MonkeyPatch) -> None: """Remove LIBHDFS_OPTS before each test so tests are isolated.""" - monkeypatch.delenv("LIBHDFS_OPTS", raising=False) + monkeypatch.delenv(LIBHDFS_OPTS_ENV, raising=False) def test_sets_env_when_unset() -> None: apply_libhdfs_opts("-Xmx512m") - assert os.environ["LIBHDFS_OPTS"] == "-Xmx512m" + assert os.environ[LIBHDFS_OPTS_ENV] == "-Xmx512m" def test_appends_to_existing(monkeypatch: pytest.MonkeyPatch) -> None: - monkeypatch.setenv("LIBHDFS_OPTS", "-Xmx256m") + monkeypatch.setenv(LIBHDFS_OPTS_ENV, "-Xmx256m") apply_libhdfs_opts("-verbose:gc") - assert os.environ["LIBHDFS_OPTS"] == "-Xmx256m -verbose:gc" + assert os.environ[LIBHDFS_OPTS_ENV] == "-Xmx256m -verbose:gc" From ed157d502597f75fad8655827e4d271d65e38235 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Mon, 6 Apr 2026 19:15:39 +0000 Subject: [PATCH 04/17] Defer UDF session creation until after first HDFS read Ensures the JVM starts with worker_jvm_args before any UDF registration code can trigger JNI. --- .../src/openhouse/dataloader/data_loader_split.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py index 5dd5a4515..f902f1cfd 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py @@ -93,9 +93,16 @@ def __iter__(self) -> Iterator[RecordBatch]: if self._transform_sql is None: yield from batches else: + # Materialize the first batch before creating the transform session + # so that the HDFS JVM starts (and picks up worker_jvm_args) before + # any UDF registration code can trigger JNI. + batch_iter = iter(batches) + first = next(batch_iter, None) session = _create_transform_session(self._scan_context.table_id, self._udf_registry) - for batch in batches: - yield from self._apply_transform(session, batch) + if first is not None: + yield from self._apply_transform(session, first) + for batch in batch_iter: + yield from self._apply_transform(session, batch) def _apply_transform(self, session: SessionContext, batch: RecordBatch) -> Iterator[RecordBatch]: """Execute the transform SQL against a single RecordBatch.""" From cdca8694d47bea489bb6f3205af5ff5f837a7e23 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Mon, 6 Apr 2026 22:49:51 +0000 Subject: [PATCH 05/17] Add integration test to verify JVM honors planner_jvm_args Pass -Xmx128m via DataLoaderContext on the first DataLoader and verify at the end that the JVM's MaxHeapSize matches 128 MB. --- .../dataloader/tests/integration_tests.py | 63 ++++++++++++++----- 1 file changed, 49 insertions(+), 14 deletions(-) diff --git a/integrations/python/dataloader/tests/integration_tests.py b/integrations/python/dataloader/tests/integration_tests.py index adfc920c1..398e13f77 100644 --- a/integrations/python/dataloader/tests/integration_tests.py +++ b/integrations/python/dataloader/tests/integration_tests.py @@ -8,6 +8,7 @@ import logging import os import sys +import tempfile import time import pyarrow as pa @@ -15,7 +16,7 @@ import requests from pyiceberg.exceptions import NoSuchTableError -from openhouse.dataloader import OpenHouseDataLoader +from openhouse.dataloader import DataLoaderContext, OpenHouseDataLoader from openhouse.dataloader.catalog import OpenHouseCatalog from openhouse.dataloader.filters import col @@ -142,11 +143,20 @@ def read_token() -> str: properties={"DEFAULT_SCHEME": "hdfs", "DEFAULT_NETLOC": HDFS_NETLOC}, ) + # Set jvm_args before any DataLoader is created so LIBHDFS_OPTS is in + # place when the JVM starts. We capture both stdout and stderr to a + # log file because -XX:+PrintFlagsFinal may write to either fd. + jvm_log_fd, jvm_log = tempfile.mkstemp(suffix=".log") + os.close(jvm_log_fd) + ctx = DataLoaderContext(planner_jvm_args="-Xmx128m -XX:+PrintFlagsFinal") + livy = LivySession(LIVY_URL, token_str) try: # 1. Nonexistent table raises NoSuchTableError with pytest.raises(NoSuchTableError): - loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table="nonexistent_table") + loader = OpenHouseDataLoader( + catalog=catalog, database=DATABASE_ID, table="nonexistent_table", context=ctx + ) _read_all(loader) print("PASS: nonexistent table raised NoSuchTableError") @@ -155,19 +165,34 @@ def read_token() -> str: f"CREATE TABLE {FQTN} ({CREATE_COLUMNS}) USING iceberg TBLPROPERTIES ('itest.custom-key' = 'custom-value')" ) try: - loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID) - assert list(loader) == [], "Expected no splits for empty table" - assert loader.snapshot_id is None, "Expected no snapshot for empty table" - assert loader.table_properties.get("itest.custom-key") == "custom-value" + # Capture stdout+stderr from here through the first HDFS read + # so we can verify -XX:+PrintFlagsFinal output at the end. + # The JVM starts during the first table load and prints flags then. + saved_stdout = os.dup(1) + saved_stderr = os.dup(2) + log_fd = os.open(jvm_log, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o644) + os.dup2(log_fd, 1) + os.dup2(log_fd, 2) + os.close(log_fd) + try: + loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID) + assert list(loader) == [], "Expected no splits for empty table" + assert loader.snapshot_id is None, "Expected no snapshot for empty table" + assert loader.table_properties.get("itest.custom-key") == "custom-value" + + # 3. Write data via Spark + livy.execute(f"INSERT INTO {FQTN} VALUES (1, 'alice', 1.1), (2, 'bob', 2.2), (3, 'charlie', 3.3)") + snap1 = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID).snapshot_id + assert snap1 is not None + + # 4. Read all data + result = _read_all(OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID)) + finally: + os.dup2(saved_stdout, 1) + os.close(saved_stdout) + os.dup2(saved_stderr, 2) + os.close(saved_stderr) print("PASS: empty table returned no splits and custom property is accessible") - - # 3. Write data via Spark - livy.execute(f"INSERT INTO {FQTN} VALUES (1, 'alice', 1.1), (2, 'bob', 2.2), (3, 'charlie', 3.3)") - snap1 = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID).snapshot_id - assert snap1 is not None - - # 4. Read all data - result = _read_all(OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table=TABLE_ID)) assert result.num_rows == 3 assert result.column(COL_ID).to_pylist() == [1, 2, 3] assert result.column(COL_NAME).to_pylist() == ["alice", "bob", "charlie"] @@ -229,3 +254,13 @@ def read_token() -> str: print("All integration tests passed") finally: livy.close() + + # Verify planner_jvm_args were honored by the JVM + with open(jvm_log) as f: + jvm_output = f.read() + os.unlink(jvm_log) + assert "MaxHeapSize" in jvm_output, "JVM did not print flags — jvm_args not honored" + assert "134217728" in jvm_output, "MaxHeapSize not 128m (134217728). JVM flags:\n" + "\n".join( + line for line in jvm_output.splitlines() if "MaxHeapSize" in line + ) + print("PASS: planner_jvm_args honored by JVM (MaxHeapSize=128m)") From 2807d7f037882738bf0ac6e021b1ed4e6e14e327 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Mon, 6 Apr 2026 23:56:01 +0000 Subject: [PATCH 06/17] Verify both planner and worker jvm_args in integration tests - Set -Xmx127m via planner_jvm_args on the first DataLoader and verify the planner JVM's MaxHeapSize is capped at 128m. - Spawn a child process to materialize a split with -Xmx254m via worker_jvm_args and verify the worker JVM gets a larger, distinct heap size. --- .../dataloader/tests/integration_tests.py | 76 +++++++++++++++++-- 1 file changed, 70 insertions(+), 6 deletions(-) diff --git a/integrations/python/dataloader/tests/integration_tests.py b/integrations/python/dataloader/tests/integration_tests.py index 398e13f77..d85115580 100644 --- a/integrations/python/dataloader/tests/integration_tests.py +++ b/integrations/python/dataloader/tests/integration_tests.py @@ -6,6 +6,7 @@ """ import logging +import multiprocessing import os import sys import tempfile @@ -108,6 +109,38 @@ def close(self) -> None: requests.delete(self._session_url, headers=HEADERS, timeout=REQUEST_TIMEOUT) +def _parse_max_heap_bytes(jvm_output: str) -> int: + """Extract MaxHeapSize value in bytes from -XX:+PrintFlagsFinal output.""" + for line in jvm_output.splitlines(): + parts = line.split() + if len(parts) >= 3 and parts[1] == "MaxHeapSize": + return int(parts[3]) + raise ValueError("MaxHeapSize not found in JVM output") + + +def _materialize_split_in_child(split, jvm_log_path): + """Materialize a single split in this process, capturing stdout+stderr to *jvm_log_path*. + + Intended to run via multiprocessing so the child gets a fresh JVM that + picks up worker_jvm_args from LIBHDFS_OPTS. + """ + saved_stdout = os.dup(1) + saved_stderr = os.dup(2) + log_fd = os.open(jvm_log_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o644) + os.dup2(log_fd, 1) + os.dup2(log_fd, 2) + os.close(log_fd) + try: + batches = list(split) + num_rows = sum(b.num_rows for b in batches) + finally: + os.dup2(saved_stdout, 1) + os.close(saved_stdout) + os.dup2(saved_stderr, 2) + os.close(saved_stderr) + print(f" child process read {num_rows} rows from split") + + def _read_all(loader: OpenHouseDataLoader) -> pa.Table: """Read all data from a DataLoader and return as a sorted PyArrow table.""" batches = [batch for split in loader for batch in split] @@ -148,7 +181,7 @@ def read_token() -> str: # log file because -XX:+PrintFlagsFinal may write to either fd. jvm_log_fd, jvm_log = tempfile.mkstemp(suffix=".log") os.close(jvm_log_fd) - ctx = DataLoaderContext(planner_jvm_args="-Xmx128m -XX:+PrintFlagsFinal") + ctx = DataLoaderContext(planner_jvm_args="-Xmx127m -XX:+PrintFlagsFinal") livy = LivySession(LIVY_URL, token_str) try: @@ -248,6 +281,26 @@ def read_token() -> str: list(loader) print("PASS: invalid snapshot_id raised ValueError") + # 8. Materialize a split in a child process with worker_jvm_args. + # The child gets a fresh JVM, so -Xmx254m takes effect there + # independently of the planner's -Xmx127m. + worker_ctx = DataLoaderContext(worker_jvm_args="-Xmx254m -XX:+PrintFlagsFinal") + worker_loader = OpenHouseDataLoader( + catalog=catalog, database=DATABASE_ID, table=TABLE_ID, context=worker_ctx + ) + splits = list(worker_loader) + assert splits, "Expected at least one split" + worker_jvm_log_fd, worker_jvm_log = tempfile.mkstemp(suffix=".log") + os.close(worker_jvm_log_fd) + spawn_ctx = multiprocessing.get_context("spawn") + proc = spawn_ctx.Process( + target=_materialize_split_in_child, args=(splits[0], worker_jvm_log) + ) + proc.start() + proc.join(timeout=120) + assert proc.exitcode == 0, f"Child process failed with exit code {proc.exitcode}" + print("PASS: worker_jvm_args split materialized in child process") + finally: livy.execute(f"DROP TABLE IF EXISTS {FQTN}") @@ -255,12 +308,23 @@ def read_token() -> str: finally: livy.close() - # Verify planner_jvm_args were honored by the JVM + # Verify planner_jvm_args: requested 127m, JVM may round up but must be close with open(jvm_log) as f: jvm_output = f.read() os.unlink(jvm_log) - assert "MaxHeapSize" in jvm_output, "JVM did not print flags — jvm_args not honored" - assert "134217728" in jvm_output, "MaxHeapSize not 128m (134217728). JVM flags:\n" + "\n".join( - line for line in jvm_output.splitlines() if "MaxHeapSize" in line + assert "MaxHeapSize" in jvm_output, "JVM did not print flags — planner jvm_args not honored" + planner_heap = _parse_max_heap_bytes(jvm_output) + assert planner_heap <= 128 * 1024 * 1024, f"Planner MaxHeapSize {planner_heap} exceeds 128m — -Xmx127m not honored" + print(f"PASS: planner_jvm_args honored by JVM (MaxHeapSize={planner_heap})") + + # Verify worker_jvm_args: requested 254m, JVM may round up but must differ from planner + with open(worker_jvm_log) as f: + worker_jvm_output = f.read() + os.unlink(worker_jvm_log) + assert "MaxHeapSize" in worker_jvm_output, "Worker JVM did not print flags — worker jvm_args not honored" + worker_heap = _parse_max_heap_bytes(worker_jvm_output) + assert worker_heap <= 256 * 1024 * 1024, f"Worker MaxHeapSize {worker_heap} exceeds 256m — -Xmx254m not honored" + assert worker_heap > planner_heap, ( + f"Worker MaxHeapSize ({worker_heap}) should be larger than planner ({planner_heap})" ) - print("PASS: planner_jvm_args honored by JVM (MaxHeapSize=128m)") + print(f"PASS: worker_jvm_args honored by child JVM (MaxHeapSize={worker_heap})") From 2b47f6336d8aef9297cc74fa392ccc201c28e399 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Mon, 6 Apr 2026 23:58:12 +0000 Subject: [PATCH 07/17] Move jvm_args verification before 'All tests passed' message Extract _assert_jvm_heap helper and run planner/worker heap checks before printing the final success message. Also fix ruff formatting. --- .../dataloader/tests/integration_tests.py | 51 +++++++++---------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/integrations/python/dataloader/tests/integration_tests.py b/integrations/python/dataloader/tests/integration_tests.py index d85115580..c66faebd5 100644 --- a/integrations/python/dataloader/tests/integration_tests.py +++ b/integrations/python/dataloader/tests/integration_tests.py @@ -118,6 +118,19 @@ def _parse_max_heap_bytes(jvm_output: str) -> int: raise ValueError("MaxHeapSize not found in JVM output") +def _assert_jvm_heap(log_path: str, requested_mb: int, upper_bound_mb: int, label: str) -> int: + """Read a JVM flags log file, assert MaxHeapSize <= upper_bound, and return the actual value.""" + with open(log_path) as f: + output = f.read() + os.unlink(log_path) + assert "MaxHeapSize" in output, f"{label} JVM did not print flags — jvm_args not honored" + heap = _parse_max_heap_bytes(output) + assert heap <= upper_bound_mb * 1024 * 1024, ( + f"{label} MaxHeapSize {heap} exceeds {upper_bound_mb}m — -Xmx{requested_mb}m not honored" + ) + return heap + + def _materialize_split_in_child(split, jvm_log_path): """Materialize a single split in this process, capturing stdout+stderr to *jvm_log_path*. @@ -187,9 +200,7 @@ def read_token() -> str: try: # 1. Nonexistent table raises NoSuchTableError with pytest.raises(NoSuchTableError): - loader = OpenHouseDataLoader( - catalog=catalog, database=DATABASE_ID, table="nonexistent_table", context=ctx - ) + loader = OpenHouseDataLoader(catalog=catalog, database=DATABASE_ID, table="nonexistent_table", context=ctx) _read_all(loader) print("PASS: nonexistent table raised NoSuchTableError") @@ -293,9 +304,7 @@ def read_token() -> str: worker_jvm_log_fd, worker_jvm_log = tempfile.mkstemp(suffix=".log") os.close(worker_jvm_log_fd) spawn_ctx = multiprocessing.get_context("spawn") - proc = spawn_ctx.Process( - target=_materialize_split_in_child, args=(splits[0], worker_jvm_log) - ) + proc = spawn_ctx.Process(target=_materialize_split_in_child, args=(splits[0], worker_jvm_log)) proc.start() proc.join(timeout=120) assert proc.exitcode == 0, f"Child process failed with exit code {proc.exitcode}" @@ -304,27 +313,15 @@ def read_token() -> str: finally: livy.execute(f"DROP TABLE IF EXISTS {FQTN}") + # Verify planner and worker jvm_args were honored by their respective JVMs + planner_heap = _assert_jvm_heap(jvm_log, requested_mb=127, upper_bound_mb=128, label="Planner") + print(f"PASS: planner_jvm_args honored by JVM (MaxHeapSize={planner_heap})") + worker_heap = _assert_jvm_heap(worker_jvm_log, requested_mb=254, upper_bound_mb=256, label="Worker") + assert worker_heap > planner_heap, ( + f"Worker MaxHeapSize ({worker_heap}) should be larger than planner ({planner_heap})" + ) + print(f"PASS: worker_jvm_args honored by child JVM (MaxHeapSize={worker_heap})") + print("All integration tests passed") finally: livy.close() - - # Verify planner_jvm_args: requested 127m, JVM may round up but must be close - with open(jvm_log) as f: - jvm_output = f.read() - os.unlink(jvm_log) - assert "MaxHeapSize" in jvm_output, "JVM did not print flags — planner jvm_args not honored" - planner_heap = _parse_max_heap_bytes(jvm_output) - assert planner_heap <= 128 * 1024 * 1024, f"Planner MaxHeapSize {planner_heap} exceeds 128m — -Xmx127m not honored" - print(f"PASS: planner_jvm_args honored by JVM (MaxHeapSize={planner_heap})") - - # Verify worker_jvm_args: requested 254m, JVM may round up but must differ from planner - with open(worker_jvm_log) as f: - worker_jvm_output = f.read() - os.unlink(worker_jvm_log) - assert "MaxHeapSize" in worker_jvm_output, "Worker JVM did not print flags — worker jvm_args not honored" - worker_heap = _parse_max_heap_bytes(worker_jvm_output) - assert worker_heap <= 256 * 1024 * 1024, f"Worker MaxHeapSize {worker_heap} exceeds 256m — -Xmx254m not honored" - assert worker_heap > planner_heap, ( - f"Worker MaxHeapSize ({worker_heap}) should be larger than planner ({planner_heap})" - ) - print(f"PASS: worker_jvm_args honored by child JVM (MaxHeapSize={worker_heap})") From 26cfab70dcb310c0ebd0987f46ae9eb1abc0eb21 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Tue, 7 Apr 2026 17:04:32 +0000 Subject: [PATCH 08/17] Clarify jvm_args docs: JNI-only, once-per-process semantics --- .../src/openhouse/dataloader/data_loader.py | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index fb35ff7c4..572a3750e 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -67,12 +67,19 @@ class DataLoaderContext: execution_context: Dictionary of execution context information (e.g. tenant, environment) table_transformer: Transformation to apply to the table before loading (e.g. column masking) udf_registry: UDFs required for the table transformation - planner_jvm_args: JVM arguments used when starting the JNI JVM in the - process that loads table metadata and plans splits. - worker_jvm_args: JVM arguments used when starting the JNI JVM in worker - processes that read split data. If splits are processed in the same - process as the planner then only ``planner_jvm_args`` takes effect - because the JVM is already running by the time the split is materialized. + planner_jvm_args: JVM arguments (e.g. ``-Xmx2g``) applied when the JNI + JVM is created in the planner process — the process that loads table + metadata and plans splits. Only relevant when the underlying storage + requires JNI (e.g. HDFS via libhdfs). The JVM is created once per + process; if another library has already started a JVM these arguments + will have no effect. + worker_jvm_args: JVM arguments applied when the JNI JVM is created in + worker processes that materialize splits. Same caveats as + ``planner_jvm_args``: only relevant for JNI-based storage and + only honored if the JVM has not already been started in the worker + process. When splits are materialized in the same process as the + planner, only ``planner_jvm_args`` takes effect because the JVM is + already running. """ execution_context: Mapping[str, str] | None = None From 810a8b99aa305c7b06fae88c1b9a4992304ad343 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Tue, 7 Apr 2026 17:42:17 +0000 Subject: [PATCH 09/17] Extract JvmConfig from DataLoaderContext Move planner_jvm_args and worker_jvm_args into a JvmConfig dataclass to reduce clutter on DataLoaderContext for non-JNI use cases. Usage: DataLoaderContext(jvm=JvmConfig(planner_args="-Xmx2g")) --- .../src/openhouse/dataloader/__init__.py | 3 +- .../src/openhouse/dataloader/data_loader.py | 46 +++++++++++-------- .../dataloader/tests/integration_tests.py | 6 +-- .../dataloader/tests/test_data_loader.py | 6 +-- 4 files changed, 36 insertions(+), 25 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/__init__.py b/integrations/python/dataloader/src/openhouse/dataloader/__init__.py index 947de1801..df266c799 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/__init__.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/__init__.py @@ -1,13 +1,14 @@ from importlib.metadata import version from openhouse.dataloader.catalog import OpenHouseCatalog, OpenHouseCatalogError -from openhouse.dataloader.data_loader import DataLoaderContext, OpenHouseDataLoader +from openhouse.dataloader.data_loader import DataLoaderContext, JvmConfig, OpenHouseDataLoader from openhouse.dataloader.filters import always_true, col __version__ = version("openhouse.dataloader") __all__ = [ "OpenHouseDataLoader", "DataLoaderContext", + "JvmConfig", "OpenHouseCatalog", "OpenHouseCatalogError", "always_true", diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index 572a3750e..c3df24487 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -56,6 +56,29 @@ def _retry[T](fn: Callable[[], T], label: str, max_attempts: int) -> T: raise AssertionError("unreachable") # pragma: no cover +@dataclass +class JvmConfig: + """JVM arguments for JNI-based storage (e.g. HDFS via libhdfs). + + The JVM is created once per process. If another library has already + started a JVM these arguments will have no effect. + + Args: + planner_args: JVM arguments (e.g. ``-Xmx2g``) applied when the JNI + JVM is created in the planner process — the process that loads + table metadata and plans splits. + worker_args: JVM arguments applied when the JNI JVM is created in + worker processes that materialize splits. Only honored if the + JVM has not already been started in the worker process. When + splits are materialized in the same process as the planner, + only ``planner_args`` takes effect because the JVM is already + running. + """ + + planner_args: str | None = None + worker_args: str | None = None + + @dataclass class DataLoaderContext: """Context and customization for the DataLoader. @@ -67,26 +90,13 @@ class DataLoaderContext: execution_context: Dictionary of execution context information (e.g. tenant, environment) table_transformer: Transformation to apply to the table before loading (e.g. column masking) udf_registry: UDFs required for the table transformation - planner_jvm_args: JVM arguments (e.g. ``-Xmx2g``) applied when the JNI - JVM is created in the planner process — the process that loads table - metadata and plans splits. Only relevant when the underlying storage - requires JNI (e.g. HDFS via libhdfs). The JVM is created once per - process; if another library has already started a JVM these arguments - will have no effect. - worker_jvm_args: JVM arguments applied when the JNI JVM is created in - worker processes that materialize splits. Same caveats as - ``planner_jvm_args``: only relevant for JNI-based storage and - only honored if the JVM has not already been started in the worker - process. When splits are materialized in the same process as the - planner, only ``planner_jvm_args`` takes effect because the JVM is - already running. + jvm: JVM configuration for JNI-based storage (e.g. HDFS). See :class:`JvmConfig`. """ execution_context: Mapping[str, str] | None = None table_transformer: TableTransformer | None = None udf_registry: UDFRegistry | None = None - planner_jvm_args: str | None = None - worker_jvm_args: str | None = None + jvm: JvmConfig | None = None class OpenHouseDataLoader: @@ -128,8 +138,8 @@ def __init__( self._context = context or DataLoaderContext() self._max_attempts = max_attempts - if self._context.planner_jvm_args is not None: - apply_libhdfs_opts(self._context.planner_jvm_args) + if self._context.jvm is not None and self._context.jvm.planner_args is not None: + apply_libhdfs_opts(self._context.jvm.planner_args) @cached_property def _iceberg_table(self) -> Table: @@ -234,7 +244,7 @@ def __iter__(self) -> Iterator[DataLoaderSplit]: projected_schema=scan.projection(), row_filter=row_filter, table_id=self._table_id, - worker_jvm_args=self._context.worker_jvm_args, + worker_jvm_args=self._context.jvm.worker_args if self._context.jvm else None, ) # plan_files() materializes all tasks at once (PyIceberg doesn't support streaming) diff --git a/integrations/python/dataloader/tests/integration_tests.py b/integrations/python/dataloader/tests/integration_tests.py index c66faebd5..9afe19d4f 100644 --- a/integrations/python/dataloader/tests/integration_tests.py +++ b/integrations/python/dataloader/tests/integration_tests.py @@ -17,7 +17,7 @@ import requests from pyiceberg.exceptions import NoSuchTableError -from openhouse.dataloader import DataLoaderContext, OpenHouseDataLoader +from openhouse.dataloader import DataLoaderContext, JvmConfig, OpenHouseDataLoader from openhouse.dataloader.catalog import OpenHouseCatalog from openhouse.dataloader.filters import col @@ -194,7 +194,7 @@ def read_token() -> str: # log file because -XX:+PrintFlagsFinal may write to either fd. jvm_log_fd, jvm_log = tempfile.mkstemp(suffix=".log") os.close(jvm_log_fd) - ctx = DataLoaderContext(planner_jvm_args="-Xmx127m -XX:+PrintFlagsFinal") + ctx = DataLoaderContext(jvm=JvmConfig(planner_args="-Xmx127m -XX:+PrintFlagsFinal")) livy = LivySession(LIVY_URL, token_str) try: @@ -295,7 +295,7 @@ def read_token() -> str: # 8. Materialize a split in a child process with worker_jvm_args. # The child gets a fresh JVM, so -Xmx254m takes effect there # independently of the planner's -Xmx127m. - worker_ctx = DataLoaderContext(worker_jvm_args="-Xmx254m -XX:+PrintFlagsFinal") + worker_ctx = DataLoaderContext(jvm=JvmConfig(worker_args="-Xmx254m -XX:+PrintFlagsFinal")) worker_loader = OpenHouseDataLoader( catalog=catalog, database=DATABASE_ID, table=TABLE_ID, context=worker_ctx ) diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index 2b19dbd3c..efd7ad68e 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -15,7 +15,7 @@ from requests import ConnectionError as RequestsConnectionError from requests import HTTPError, Response, Timeout -from openhouse.dataloader import DataLoaderContext, OpenHouseDataLoader, __version__ +from openhouse.dataloader import DataLoaderContext, JvmConfig, OpenHouseDataLoader, __version__ from openhouse.dataloader.data_loader_split import DataLoaderSplit, to_sql_identifier from openhouse.dataloader.filters import col from openhouse.dataloader.table_transformer import TableTransformer @@ -751,7 +751,7 @@ def test_starts_with_wildcard_literals(tmp_path, filter_expr, expected_names): def test_planner_jvm_args_sets_libhdfs_opts(tmp_path, monkeypatch): - """planner_jvm_args is applied to LIBHDFS_OPTS during __init__.""" + """JvmConfig.planner_args is applied to LIBHDFS_OPTS during __init__.""" from openhouse.dataloader._jvm import LIBHDFS_OPTS_ENV monkeypatch.delenv(LIBHDFS_OPTS_ENV, raising=False) @@ -761,7 +761,7 @@ def test_planner_jvm_args_sets_libhdfs_opts(tmp_path, monkeypatch): catalog=catalog, database="db", table="tbl", - context=DataLoaderContext(planner_jvm_args="-Xmx256m"), + context=DataLoaderContext(jvm=JvmConfig(planner_args="-Xmx256m")), ) assert os.environ[LIBHDFS_OPTS_ENV] == "-Xmx256m" From 23cc91ac2253afff4eda0da3fb0e5ef2cfa9fe35 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Tue, 7 Apr 2026 17:42:47 +0000 Subject: [PATCH 10/17] Fix JvmConfig docstring wording --- .../python/dataloader/src/openhouse/dataloader/data_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index c3df24487..532cb5f51 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -90,7 +90,7 @@ class DataLoaderContext: execution_context: Dictionary of execution context information (e.g. tenant, environment) table_transformer: Transformation to apply to the table before loading (e.g. column masking) udf_registry: UDFs required for the table transformation - jvm: JVM configuration for JNI-based storage (e.g. HDFS). See :class:`JvmConfig`. + jvm: JVM configuration for JNI-based storage access (e.g. HDFS). See :class:`JvmConfig`. """ execution_context: Mapping[str, str] | None = None From 5451c15263747d84ff54a8ca14235b30b1f37f97 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Tue, 7 Apr 2026 17:45:43 +0000 Subject: [PATCH 11/17] Rename jvm field to jvm_config on DataLoaderContext --- .../dataloader/src/openhouse/dataloader/data_loader.py | 10 +++++----- .../python/dataloader/tests/integration_tests.py | 4 ++-- .../python/dataloader/tests/test_data_loader.py | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index 532cb5f51..caf30fd6c 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -90,13 +90,13 @@ class DataLoaderContext: execution_context: Dictionary of execution context information (e.g. tenant, environment) table_transformer: Transformation to apply to the table before loading (e.g. column masking) udf_registry: UDFs required for the table transformation - jvm: JVM configuration for JNI-based storage access (e.g. HDFS). See :class:`JvmConfig`. + jvm_config: JVM configuration for JNI-based storage access (e.g. HDFS). See :class:`JvmConfig`. """ execution_context: Mapping[str, str] | None = None table_transformer: TableTransformer | None = None udf_registry: UDFRegistry | None = None - jvm: JvmConfig | None = None + jvm_config: JvmConfig | None = None class OpenHouseDataLoader: @@ -138,8 +138,8 @@ def __init__( self._context = context or DataLoaderContext() self._max_attempts = max_attempts - if self._context.jvm is not None and self._context.jvm.planner_args is not None: - apply_libhdfs_opts(self._context.jvm.planner_args) + if self._context.jvm_config is not None and self._context.jvm_config.planner_args is not None: + apply_libhdfs_opts(self._context.jvm_config.planner_args) @cached_property def _iceberg_table(self) -> Table: @@ -244,7 +244,7 @@ def __iter__(self) -> Iterator[DataLoaderSplit]: projected_schema=scan.projection(), row_filter=row_filter, table_id=self._table_id, - worker_jvm_args=self._context.jvm.worker_args if self._context.jvm else None, + worker_jvm_args=self._context.jvm_config.worker_args if self._context.jvm_config else None, ) # plan_files() materializes all tasks at once (PyIceberg doesn't support streaming) diff --git a/integrations/python/dataloader/tests/integration_tests.py b/integrations/python/dataloader/tests/integration_tests.py index 9afe19d4f..2f5755b90 100644 --- a/integrations/python/dataloader/tests/integration_tests.py +++ b/integrations/python/dataloader/tests/integration_tests.py @@ -194,7 +194,7 @@ def read_token() -> str: # log file because -XX:+PrintFlagsFinal may write to either fd. jvm_log_fd, jvm_log = tempfile.mkstemp(suffix=".log") os.close(jvm_log_fd) - ctx = DataLoaderContext(jvm=JvmConfig(planner_args="-Xmx127m -XX:+PrintFlagsFinal")) + ctx = DataLoaderContext(jvm_config=JvmConfig(planner_args="-Xmx127m -XX:+PrintFlagsFinal")) livy = LivySession(LIVY_URL, token_str) try: @@ -295,7 +295,7 @@ def read_token() -> str: # 8. Materialize a split in a child process with worker_jvm_args. # The child gets a fresh JVM, so -Xmx254m takes effect there # independently of the planner's -Xmx127m. - worker_ctx = DataLoaderContext(jvm=JvmConfig(worker_args="-Xmx254m -XX:+PrintFlagsFinal")) + worker_ctx = DataLoaderContext(jvm_config=JvmConfig(worker_args="-Xmx254m -XX:+PrintFlagsFinal")) worker_loader = OpenHouseDataLoader( catalog=catalog, database=DATABASE_ID, table=TABLE_ID, context=worker_ctx ) diff --git a/integrations/python/dataloader/tests/test_data_loader.py b/integrations/python/dataloader/tests/test_data_loader.py index efd7ad68e..4ec0edee5 100644 --- a/integrations/python/dataloader/tests/test_data_loader.py +++ b/integrations/python/dataloader/tests/test_data_loader.py @@ -761,7 +761,7 @@ def test_planner_jvm_args_sets_libhdfs_opts(tmp_path, monkeypatch): catalog=catalog, database="db", table="tbl", - context=DataLoaderContext(jvm=JvmConfig(planner_args="-Xmx256m")), + context=DataLoaderContext(jvm_config=JvmConfig(planner_args="-Xmx256m")), ) assert os.environ[LIBHDFS_OPTS_ENV] == "-Xmx256m" From 457da7b4089c5290355310d5dd5d875ed17820f4 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Tue, 7 Apr 2026 17:46:14 +0000 Subject: [PATCH 12/17] Update JvmConfig docstring to say 'storage access' --- .../python/dataloader/src/openhouse/dataloader/data_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index caf30fd6c..521249023 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -58,7 +58,7 @@ def _retry[T](fn: Callable[[], T], label: str, max_attempts: int) -> T: @dataclass class JvmConfig: - """JVM arguments for JNI-based storage (e.g. HDFS via libhdfs). + """JVM arguments for JNI-based storage access (e.g. HDFS via libhdfs). The JVM is created once per process. If another library has already started a JVM these arguments will have no effect. From 02bbb42db67c5f08d262d50ebab87461abf1799e Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Tue, 7 Apr 2026 18:18:59 +0000 Subject: [PATCH 13/17] Make apply_libhdfs_opts idempotent and thread-safe Skip appending if jvm_args already present in LIBHDFS_OPTS. Use a threading lock to prevent concurrent threads from duplicating args. --- .../dataloader/src/openhouse/dataloader/_jvm.py | 15 +++++++++++---- integrations/python/dataloader/tests/test_jvm.py | 6 ++++++ 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py b/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py index a993843ba..46980685d 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py @@ -1,18 +1,25 @@ """JVM configuration utilities for the HDFS client.""" import os +import threading LIBHDFS_OPTS_ENV = "LIBHDFS_OPTS" """Environment variable read by libhdfs when starting the JNI JVM.""" +_lock = threading.Lock() + def apply_libhdfs_opts(jvm_args: str) -> None: """Merge *jvm_args* into the JNI JVM options environment variable. Appends to any existing value. Must be called before the first HDFS access in the current process (the JVM is started once and - reads these options only at startup). + reads these options only at startup). Thread-safe and idempotent — + duplicate args are not appended. """ - existing = os.environ.get(LIBHDFS_OPTS_ENV, "") - merged = f"{existing} {jvm_args}".strip() if existing else jvm_args - os.environ[LIBHDFS_OPTS_ENV] = merged + with _lock: + existing = os.environ.get(LIBHDFS_OPTS_ENV, "") + if jvm_args in existing: + return + merged = f"{existing} {jvm_args}".strip() if existing else jvm_args + os.environ[LIBHDFS_OPTS_ENV] = merged diff --git a/integrations/python/dataloader/tests/test_jvm.py b/integrations/python/dataloader/tests/test_jvm.py index 09702dd8f..eccec1a1f 100644 --- a/integrations/python/dataloader/tests/test_jvm.py +++ b/integrations/python/dataloader/tests/test_jvm.py @@ -20,3 +20,9 @@ def test_appends_to_existing(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv(LIBHDFS_OPTS_ENV, "-Xmx256m") apply_libhdfs_opts("-verbose:gc") assert os.environ[LIBHDFS_OPTS_ENV] == "-Xmx256m -verbose:gc" + + +def test_skips_duplicate_args() -> None: + apply_libhdfs_opts("-Xmx512m") + apply_libhdfs_opts("-Xmx512m") + assert os.environ[LIBHDFS_OPTS_ENV] == "-Xmx512m" From e9c3ec32f4e7de3307146816e84773bcbf6cd6e0 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Tue, 7 Apr 2026 18:29:55 +0000 Subject: [PATCH 14/17] Add worker_jvm_args to TableScanContext docstring; remove temp file cleanup --- .../dataloader/src/openhouse/dataloader/_table_scan_context.py | 1 + integrations/python/dataloader/tests/integration_tests.py | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py b/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py index b8a0dab25..ae20bd9c5 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/_table_scan_context.py @@ -41,6 +41,7 @@ class TableScanContext: projected_schema: Subset of columns to read (equals table schema when no projection) table_id: Identifier for the table being scanned row_filter: Row-level filter expression pushed down to the scan + worker_jvm_args: JVM arguments applied when the JNI JVM is created in worker processes """ table_metadata: TableMetadata diff --git a/integrations/python/dataloader/tests/integration_tests.py b/integrations/python/dataloader/tests/integration_tests.py index 2f5755b90..3ccfe4f48 100644 --- a/integrations/python/dataloader/tests/integration_tests.py +++ b/integrations/python/dataloader/tests/integration_tests.py @@ -122,7 +122,6 @@ def _assert_jvm_heap(log_path: str, requested_mb: int, upper_bound_mb: int, labe """Read a JVM flags log file, assert MaxHeapSize <= upper_bound, and return the actual value.""" with open(log_path) as f: output = f.read() - os.unlink(log_path) assert "MaxHeapSize" in output, f"{label} JVM did not print flags — jvm_args not honored" heap = _parse_max_heap_bytes(output) assert heap <= upper_bound_mb * 1024 * 1024, ( From 2823a3c15db35ffe03f87ebce3f902a1a6410862 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Wed, 8 Apr 2026 21:30:10 +0000 Subject: [PATCH 15/17] Address PR review comments - Log info message when LIBHDFS_OPTS is set - Mention LIBHDFS_OPTS in DataLoaderContext.jvm_config docstring - Skip transform session creation when batch is empty --- .../python/dataloader/src/openhouse/dataloader/_jvm.py | 4 ++++ .../dataloader/src/openhouse/dataloader/data_loader.py | 3 ++- .../src/openhouse/dataloader/data_loader_split.py | 9 +++++---- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py b/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py index 46980685d..550a17dbd 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/_jvm.py @@ -1,8 +1,11 @@ """JVM configuration utilities for the HDFS client.""" +import logging import os import threading +logger = logging.getLogger(__name__) + LIBHDFS_OPTS_ENV = "LIBHDFS_OPTS" """Environment variable read by libhdfs when starting the JNI JVM.""" @@ -23,3 +26,4 @@ def apply_libhdfs_opts(jvm_args: str) -> None: return merged = f"{existing} {jvm_args}".strip() if existing else jvm_args os.environ[LIBHDFS_OPTS_ENV] = merged + logger.info("Set %s=%s", LIBHDFS_OPTS_ENV, merged) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index 521249023..086893ca8 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -90,7 +90,8 @@ class DataLoaderContext: execution_context: Dictionary of execution context information (e.g. tenant, environment) table_transformer: Transformation to apply to the table before loading (e.g. column masking) udf_registry: UDFs required for the table transformation - jvm_config: JVM configuration for JNI-based storage access (e.g. HDFS). See :class:`JvmConfig`. + jvm_config: JVM configuration for JNI-based storage access (e.g. HDFS). Args are applied + via the ``LIBHDFS_OPTS`` environment variable. See :class:`JvmConfig`. """ execution_context: Mapping[str, str] | None = None diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py index f902f1cfd..38331bbe4 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader_split.py @@ -98,11 +98,12 @@ def __iter__(self) -> Iterator[RecordBatch]: # any UDF registration code can trigger JNI. batch_iter = iter(batches) first = next(batch_iter, None) + if first is None: + return session = _create_transform_session(self._scan_context.table_id, self._udf_registry) - if first is not None: - yield from self._apply_transform(session, first) - for batch in batch_iter: - yield from self._apply_transform(session, batch) + yield from self._apply_transform(session, first) + for batch in batch_iter: + yield from self._apply_transform(session, batch) def _apply_transform(self, session: SessionContext, batch: RecordBatch) -> Iterator[RecordBatch]: """Execute the transform SQL against a single RecordBatch.""" From e3024fc77c0e9dd8b1fa99a38847f1a4d21a4900 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Wed, 8 Apr 2026 21:40:13 +0000 Subject: [PATCH 16/17] Make JvmConfig a frozen dataclass --- .../python/dataloader/src/openhouse/dataloader/data_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index 086893ca8..287535296 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -56,7 +56,7 @@ def _retry[T](fn: Callable[[], T], label: str, max_attempts: int) -> T: raise AssertionError("unreachable") # pragma: no cover -@dataclass +@dataclass(frozen=True) class JvmConfig: """JVM arguments for JNI-based storage access (e.g. HDFS via libhdfs). From 7be131df8e28f77964793e895df12a0f0866d869 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Wed, 8 Apr 2026 21:41:10 +0000 Subject: [PATCH 17/17] Update DataLoaderContext.jvm_config docstring wording --- .../python/dataloader/src/openhouse/dataloader/data_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py index 287535296..f2118d30b 100644 --- a/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py +++ b/integrations/python/dataloader/src/openhouse/dataloader/data_loader.py @@ -90,7 +90,7 @@ class DataLoaderContext: execution_context: Dictionary of execution context information (e.g. tenant, environment) table_transformer: Transformation to apply to the table before loading (e.g. column masking) udf_registry: UDFs required for the table transformation - jvm_config: JVM configuration for JNI-based storage access (e.g. HDFS). Args are applied + jvm_config: JVM configuration for JNI-based storage access. Currently only HDFS is supported via the ``LIBHDFS_OPTS`` environment variable. See :class:`JvmConfig`. """