From 019179ee17d3ae1b5051194d47e7eb1ac883adc3 Mon Sep 17 00:00:00 2001 From: "Matthew.Deshotel" Date: Mon, 30 Mar 2026 10:34:32 -0500 Subject: [PATCH 01/10] add handling for multiple read/write permissions issues --- .../historical_forcing.py | 62 ++++++++++++++++--- 1 file changed, 54 insertions(+), 8 deletions(-) diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py index 7fdbd06b..07e9c5ac 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py @@ -1,15 +1,22 @@ """Module for processing AORC and NWM data.""" import datetime +import hashlib import os import re +import shutil +import tempfile import typing +import warnings from contextlib import contextmanager from datetime import timedelta from functools import cached_property -from time import perf_counter +from time import perf_counter, sleep import dask + +# Use the Error, Warning, and Trapping System Package for logging +import ewts import geopandas as gpd import matplotlib.pyplot as plt import numpy as np @@ -27,12 +34,11 @@ ) from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.parallel import MpiConfig -# Use the Error, Warning, and Trapping System Package for logging -import ewts LOG = ewts.get_logger(ewts.FORCING_ID) zarr.config.set({"async.concurrency": 100}) + class BaseProcessor: """Base class for data processors.""" @@ -148,7 +154,17 @@ def gpkg_name(self) -> str: @property def nc_path(self) -> str: """Construct file path for cached netcdf files.""" - return f"/tmp/{self.dataset_name}_{self.gpkg_name}_{self.current_time_str}_{self.end_time_str}.nc" + return f"/tmp/{self.cache_key}.nc" + + @property + def cache_key(self): + return f"{self.dataset_name}_{self.gpkg_name}_{self.current_time_str}_{self.end_time_str}" + + @property + def nc_tmp_hash_path(self) -> str: + """Construct file path for cached netcdf files.""" + cache_hash = hashlib.md5(self.cache_key.encode()).hexdigest()[:8] + return f"/tmp/{cache_hash}.nc" @property def end_time_datetime(self) -> pd.Timestamp: @@ -243,7 +259,9 @@ def compute_ds(self) -> xr.Dataset: self.mpi_config.comm.barrier() ds = self.mpi_config.comm.bcast(ds, root=0) if self.mpi_config.rank == 0: - ds.to_netcdf(self.nc_path) + ds.to_netcdf(self.nc_tmp_hash_path) + shutil.copy(self.nc_tmp_hash_path, self.nc_path) + os.remove(self.nc_tmp_hash_path) return ds @cached_property @@ -370,7 +388,16 @@ def sliced_ds(self) -> xr.Dataset: try: if os.path.exists(self.nc_path): with self.timing_block(f"opening local dataset {self.nc_path}"): - return xr.open_dataset(self.nc_path) + c = 0 + while c < 10: + try: + return xr.open_dataset(self.nc_path) + except Exception as e: + warnings.warn( + f"loc on cache file; sleeping 1s({c}). Error: {e}" + ) + sleep(1) + c += 1 else: with self.timing_block(f"lazy loading {self.dataset_name} data"): return self.slice_ds( @@ -602,7 +629,16 @@ def sliced_ds(self) -> xr.Dataset: try: if os.path.exists(self.nc_path): with self.timing_block(f"opening local dataset {self.nc_path}"): - return xr.open_dataset(self.nc_path) + c = 0 + while c < 10: + try: + return xr.open_dataset(self.nc_path) + except Exception as e: + warnings.warn( + f"loc on cache file; sleeping 1s({c}). Error: {e}" + ) + sleep(1) + c += 1 else: with self.timing_block(f"lazy loading {self.dataset_name} data"): return self.slice_ds(self.s3_lazy_ds).rename( @@ -656,7 +692,17 @@ def sliced_ds(self) -> xr.Dataset: try: if os.path.exists(self.nc_path): with self.timing_block(f"opening local dataset {self.nc_path}"): - return xr.open_dataset(self.nc_path) + c = 0 + while c < 10: + try: + return xr.open_dataset(self.nc_path) + except Exception as e: + warnings.warn( + f"loc on cache file; sleeping 1s({c}). Error: {e}" + ) + sleep(1) + c += 1 + else: with self.timing_block(f"lazy loading {self.dataset_name} data"): return self.slice_ds(self.s3_lazy_ds).rename( From 82bcf38052575643ae2491c239ace15bda43e6dd Mon Sep 17 00:00:00 2001 From: "Matthew.Deshotel" Date: Mon, 30 Mar 2026 11:30:33 -0500 Subject: [PATCH 02/10] add error after 10 attempts of trying to read local chache file --- .../NextGen_Forcings_Engine/historical_forcing.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py index 07e9c5ac..160304d9 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py @@ -394,10 +394,11 @@ def sliced_ds(self) -> xr.Dataset: return xr.open_dataset(self.nc_path) except Exception as e: warnings.warn( - f"loc on cache file; sleeping 1s({c}). Error: {e}" + f"Lock on cache file; sleeping 1s({c}). Error: {e}" ) sleep(1) c += 1 + raise ValueError(f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}") else: with self.timing_block(f"lazy loading {self.dataset_name} data"): return self.slice_ds( @@ -635,10 +636,11 @@ def sliced_ds(self) -> xr.Dataset: return xr.open_dataset(self.nc_path) except Exception as e: warnings.warn( - f"loc on cache file; sleeping 1s({c}). Error: {e}" + f"Lock on cache file; sleeping 1s({c}). Error: {e}" ) sleep(1) c += 1 + raise ValueError(f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}") else: with self.timing_block(f"lazy loading {self.dataset_name} data"): return self.slice_ds(self.s3_lazy_ds).rename( @@ -698,11 +700,11 @@ def sliced_ds(self) -> xr.Dataset: return xr.open_dataset(self.nc_path) except Exception as e: warnings.warn( - f"loc on cache file; sleeping 1s({c}). Error: {e}" + f"Lock on cache file; sleeping 1s({c}). Error: {e}" ) sleep(1) c += 1 - + raise ValueError(f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}") else: with self.timing_block(f"lazy loading {self.dataset_name} data"): return self.slice_ds(self.s3_lazy_ds).rename( From d0d785f8d5fe9229971ec7bef1df2a758e9af09e Mon Sep 17 00:00:00 2001 From: Matthew Deshotel Date: Wed, 1 Apr 2026 06:23:24 -0500 Subject: [PATCH 03/10] implement retries for writing netcdf cache file --- .../historical_forcing.py | 43 +++++++++++++------ 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py index 160304d9..884b7f8e 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py @@ -154,17 +154,18 @@ def gpkg_name(self) -> str: @property def nc_path(self) -> str: """Construct file path for cached netcdf files.""" - return f"/tmp/{self.cache_key}.nc" + return f"/tmp/{self.cache_filename}.nc" @property - def cache_key(self): + def cache_filename(self): + """Cache filename.""" return f"{self.dataset_name}_{self.gpkg_name}_{self.current_time_str}_{self.end_time_str}" - @property - def nc_tmp_hash_path(self) -> str: - """Construct file path for cached netcdf files.""" - cache_hash = hashlib.md5(self.cache_key.encode()).hexdigest()[:8] - return f"/tmp/{cache_hash}.nc" + # @property + # def nc_tmp_hash_path(self) -> str: + # """Construct file path for cached netcdf files.""" + # cache_hash = hashlib.md5(self.cache_key.encode()).hexdigest()[:8] + # return f"/tmp/{cache_hash}.nc" @property def end_time_datetime(self) -> pd.Timestamp: @@ -259,9 +260,19 @@ def compute_ds(self) -> xr.Dataset: self.mpi_config.comm.barrier() ds = self.mpi_config.comm.bcast(ds, root=0) if self.mpi_config.rank == 0: - ds.to_netcdf(self.nc_tmp_hash_path) - shutil.copy(self.nc_tmp_hash_path, self.nc_path) - os.remove(self.nc_tmp_hash_path) + c = 0 + while c < 10: + try: + ds.to_netcdf(self.nc_path) + except PermissionError: + warnings.warn( + f"There appears to be a lock on the netcdf cache file while writing. Sleeping 1 second and trying again ({c})." + ) + sleep(1) + c += 1 + raise PermissionError( + f"Could write the netcdf cache file within the specified number of retries(10): {self.nc_path}" + ) return ds @cached_property @@ -398,7 +409,9 @@ def sliced_ds(self) -> xr.Dataset: ) sleep(1) c += 1 - raise ValueError(f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}") + raise ValueError( + f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}" + ) else: with self.timing_block(f"lazy loading {self.dataset_name} data"): return self.slice_ds( @@ -640,7 +653,9 @@ def sliced_ds(self) -> xr.Dataset: ) sleep(1) c += 1 - raise ValueError(f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}") + raise ValueError( + f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}" + ) else: with self.timing_block(f"lazy loading {self.dataset_name} data"): return self.slice_ds(self.s3_lazy_ds).rename( @@ -704,7 +719,9 @@ def sliced_ds(self) -> xr.Dataset: ) sleep(1) c += 1 - raise ValueError(f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}") + raise ValueError( + f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}" + ) else: with self.timing_block(f"lazy loading {self.dataset_name} data"): return self.slice_ds(self.s3_lazy_ds).rename( From cdc627ab31b3b461e80f005366a30a219b7a1437 Mon Sep 17 00:00:00 2001 From: Matthew Deshotel Date: Wed, 1 Apr 2026 09:26:06 -0500 Subject: [PATCH 04/10] split try-except into two try-excepts --- .../historical_forcing.py | 43 +++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py index 884b7f8e..eabbbf7d 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py @@ -396,32 +396,31 @@ def sliced_ds(self) -> xr.Dataset: :return: xarray Dataset :raises Exception: If zarr open fails """ - try: - if os.path.exists(self.nc_path): - with self.timing_block(f"opening local dataset {self.nc_path}"): - c = 0 - while c < 10: - try: - return xr.open_dataset(self.nc_path) - except Exception as e: - warnings.warn( - f"Lock on cache file; sleeping 1s({c}). Error: {e}" - ) - sleep(1) - c += 1 - raise ValueError( - f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}" - ) - else: + if os.path.exists(self.nc_path): + with self.timing_block(f"opening local dataset {self.nc_path}"): + c = 0 + while c < 10: + try: + return xr.open_dataset(self.nc_path) + except Exception as e: + warnings.warn( + f"Lock on cache file; sleeping 1s({c}). Error: {e}" + ) + sleep(1) + c += 1 + error_message = f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}" + LOG.critical(error_message) + raise ValueError(error_message) + else: + try: with self.timing_block(f"lazy loading {self.dataset_name} data"): return self.slice_ds( self.s3_lazy_ds[self.current_time.year] ).rename({self.x_label: "x", self.y_label: "y"}) - except Exception as e: - LOG.critical( - f"Error opening {self.dataset_name} data from {self.url(self.current_time.year)}: {e}\n" - ) - raise e + except Exception as e: + error_message = f"Error opening {self.dataset_name} data from {self.url(self.current_time.year)}: {e}\n" + LOG.critical(error_message) + raise ValueError(error_message) @cached_property def s3_lazy_ds(self) -> dict[int, xr.Dataset]: From 1ad5aa6f0658b9e33cbbce2a2805c8b66e52c3fc Mon Sep 17 00:00:00 2001 From: Matthew Deshotel Date: Wed, 1 Apr 2026 09:34:42 -0500 Subject: [PATCH 05/10] fix while loop logic --- .../historical_forcing.py | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py index eabbbf7d..a1fc8dd7 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py @@ -264,15 +264,16 @@ def compute_ds(self) -> xr.Dataset: while c < 10: try: ds.to_netcdf(self.nc_path) - except PermissionError: + except Exception as e: warnings.warn( f"There appears to be a lock on the netcdf cache file while writing. Sleeping 1 second and trying again ({c})." ) sleep(1) c += 1 - raise PermissionError( - f"Could write the netcdf cache file within the specified number of retries(10): {self.nc_path}" - ) + if c == 10: + raise PermissionError( + f"Could write the netcdf cache file within the specified number of retries(10): {self.nc_path} | Error: {e}" + ) return ds @cached_property @@ -401,16 +402,17 @@ def sliced_ds(self) -> xr.Dataset: c = 0 while c < 10: try: - return xr.open_dataset(self.nc_path) + return xr.open_dataset(self.nc_path, engine="netcdf4") except Exception as e: warnings.warn( f"Lock on cache file; sleeping 1s({c}). Error: {e}" ) sleep(1) c += 1 - error_message = f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}" - LOG.critical(error_message) - raise ValueError(error_message) + if c == 10: + error_message = f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}" + LOG.critical(error_message) + raise ValueError(error_message) else: try: with self.timing_block(f"lazy loading {self.dataset_name} data"): @@ -652,9 +654,10 @@ def sliced_ds(self) -> xr.Dataset: ) sleep(1) c += 1 - raise ValueError( - f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}" - ) + if c == 10: + raise ValueError( + f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}" + ) else: with self.timing_block(f"lazy loading {self.dataset_name} data"): return self.slice_ds(self.s3_lazy_ds).rename( @@ -718,9 +721,10 @@ def sliced_ds(self) -> xr.Dataset: ) sleep(1) c += 1 - raise ValueError( - f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}" - ) + if c == 10: + raise ValueError( + f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}" + ) else: with self.timing_block(f"lazy loading {self.dataset_name} data"): return self.slice_ds(self.s3_lazy_ds).rename( From 89e0412addb09b54ec568f9bb1bb0faf77b54b75 Mon Sep 17 00:00:00 2001 From: Matthew Deshotel Date: Wed, 1 Apr 2026 09:46:50 -0500 Subject: [PATCH 06/10] initialize e --- .../NextGen_Forcings_Engine/historical_forcing.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py index a1fc8dd7..1e1ce3f4 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py @@ -260,7 +260,7 @@ def compute_ds(self) -> xr.Dataset: self.mpi_config.comm.barrier() ds = self.mpi_config.comm.bcast(ds, root=0) if self.mpi_config.rank == 0: - c = 0 + e = c = 0 while c < 10: try: ds.to_netcdf(self.nc_path) From 9fa03be9bba4de115a3df59a792b4fe4662989ac Mon Sep 17 00:00:00 2001 From: "Matthew.Deshotel" Date: Wed, 1 Apr 2026 10:01:43 -0500 Subject: [PATCH 07/10] open cache file with context manager and load data to memory --- .../NextGen_Forcings_Engine/historical_forcing.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py index 1e1ce3f4..d7b25ce3 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py @@ -402,7 +402,8 @@ def sliced_ds(self) -> xr.Dataset: c = 0 while c < 10: try: - return xr.open_dataset(self.nc_path, engine="netcdf4") + with xr.open_dataset(self.nc_path, engine="netcdf4") as ds: + return ds.load() except Exception as e: warnings.warn( f"Lock on cache file; sleeping 1s({c}). Error: {e}" From 3b4516e75e96ea04ebc104e76ae2a16bfda5796c Mon Sep 17 00:00:00 2001 From: "Matthew.Deshotel" Date: Wed, 1 Apr 2026 11:22:18 -0500 Subject: [PATCH 08/10] load ds with context manager and remove if locked. --- .../historical_forcing.py | 58 ++++++++++--------- 1 file changed, 31 insertions(+), 27 deletions(-) diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py index d7b25ce3..6a1f1f6d 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py @@ -260,20 +260,22 @@ def compute_ds(self) -> xr.Dataset: self.mpi_config.comm.barrier() ds = self.mpi_config.comm.bcast(ds, root=0) if self.mpi_config.rank == 0: - e = c = 0 - while c < 10: - try: - ds.to_netcdf(self.nc_path) - except Exception as e: - warnings.warn( - f"There appears to be a lock on the netcdf cache file while writing. Sleeping 1 second and trying again ({c})." + if not os.path.exists(self.nc_path): + c = 0 + while c < 10: + try: + ds.to_netcdf(self.nc_path, "w") + break + except Exception as e: + warnings.warn( + f"There appears to be a lock on the netcdf cache file while writing. Sleeping 1 second and trying again ({c}). | Error: {e}" + ) + sleep(1) + c += 1 + if c == 10: + raise PermissionError( + f"Could write the netcdf cache file within the specified number of retries(10): {self.nc_path}" ) - sleep(1) - c += 1 - if c == 10: - raise PermissionError( - f"Could write the netcdf cache file within the specified number of retries(10): {self.nc_path} | Error: {e}" - ) return ds @cached_property @@ -403,7 +405,8 @@ def sliced_ds(self) -> xr.Dataset: while c < 10: try: with xr.open_dataset(self.nc_path, engine="netcdf4") as ds: - return ds.load() + dataset = ds.load() + return dataset except Exception as e: warnings.warn( f"Lock on cache file; sleeping 1s({c}). Error: {e}" @@ -411,19 +414,20 @@ def sliced_ds(self) -> xr.Dataset: sleep(1) c += 1 if c == 10: - error_message = f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}" - LOG.critical(error_message) - raise ValueError(error_message) - else: - try: - with self.timing_block(f"lazy loading {self.dataset_name} data"): - return self.slice_ds( - self.s3_lazy_ds[self.current_time.year] - ).rename({self.x_label: "x", self.y_label: "y"}) - except Exception as e: - error_message = f"Error opening {self.dataset_name} data from {self.url(self.current_time.year)}: {e}\n" - LOG.critical(error_message) - raise ValueError(error_message) + error_message = f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}. Deleteing the cache file and recreating from s3" + warnings.warn(error_message) + os.remove(self.nc_path) + # LOG.critical(error_message) + # raise ValueError(error_message) + try: + with self.timing_block(f"lazy loading {self.dataset_name} data"): + return self.slice_ds(self.s3_lazy_ds[self.current_time.year]).rename( + {self.x_label: "x", self.y_label: "y"} + ) + except Exception as e: + error_message = f"Error opening {self.dataset_name} data from {self.url(self.current_time.year)}: {e}\n" + LOG.critical(error_message) + raise ValueError(error_message) @cached_property def s3_lazy_ds(self) -> dict[int, xr.Dataset]: From fa2f263a9322009db63610249b594a9dafc2492b Mon Sep 17 00:00:00 2001 From: Matthew Deshotel Date: Thu, 2 Apr 2026 11:10:57 -0500 Subject: [PATCH 09/10] add load cache function --- .../historical_forcing.py | 150 +++++++----------- 1 file changed, 60 insertions(+), 90 deletions(-) diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py index 6a1f1f6d..949dbd61 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py @@ -1,20 +1,14 @@ """Module for processing AORC and NWM data.""" import datetime -import hashlib +import gc import os -import re -import shutil -import tempfile import typing -import warnings from contextlib import contextmanager from datetime import timedelta from functools import cached_property from time import perf_counter, sleep -import dask - # Use the Error, Warning, and Trapping System Package for logging import ewts import geopandas as gpd @@ -161,12 +155,6 @@ def cache_filename(self): """Cache filename.""" return f"{self.dataset_name}_{self.gpkg_name}_{self.current_time_str}_{self.end_time_str}" - # @property - # def nc_tmp_hash_path(self) -> str: - # """Construct file path for cached netcdf files.""" - # cache_hash = hashlib.md5(self.cache_key.encode()).hexdigest()[:8] - # return f"/tmp/{cache_hash}.nc" - @property def end_time_datetime(self) -> pd.Timestamp: """Datetime object for the end time step.""" @@ -256,7 +244,7 @@ def compute_ds(self) -> xr.Dataset: ds = None if self.mpi_config.rank == 0: with self.timing_block("computing dataset", LOG.info): - ds = self.sliced_ds.compute().rio.write_crs(self.src_crs) + ds = self.sliced_ds.rio.write_crs(self.src_crs) self.mpi_config.comm.barrier() ds = self.mpi_config.comm.bcast(ds, root=0) if self.mpi_config.rank == 0: @@ -267,14 +255,14 @@ def compute_ds(self) -> xr.Dataset: ds.to_netcdf(self.nc_path, "w") break except Exception as e: - warnings.warn( + LOG.warning( f"There appears to be a lock on the netcdf cache file while writing. Sleeping 1 second and trying again ({c}). | Error: {e}" ) sleep(1) c += 1 - if c == 10: + else: raise PermissionError( - f"Could write the netcdf cache file within the specified number of retries(10): {self.nc_path}" + f"Could not write the netcdf cache file within the specified number of retries(10): {self.nc_path}" ) return ds @@ -354,6 +342,27 @@ def slice_ds(self, ds: xr.Dataset) -> xr.Dataset: ) return sliced_ds + def load_cache(self) -> xr.Dataset | None: + """Load the cahed netcdf file.""" + if os.path.exists(self.nc_path): + with self.timing_block(f"opening local dataset {self.nc_path}"): + c = 0 + while c < 10: + try: + ds = xr.open_dataset(self.nc_path) + dataset = ds.load() + ds.close() + gc.collect() + return dataset + except Exception as e: + LOG.warning(f"Lock on cache file; sleeping 1s({c}). Error: {e}") + sleep(1) + c += 1 + + error_message = f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}. Deleteing the cache file and recreating from s3" + LOG.warning(error_message) + os.remove(self.nc_path) + class AORCConusProcessor(BaseProcessor): """Processor for CONUS AORC data.""" @@ -399,30 +408,15 @@ def sliced_ds(self) -> xr.Dataset: :return: xarray Dataset :raises Exception: If zarr open fails """ - if os.path.exists(self.nc_path): - with self.timing_block(f"opening local dataset {self.nc_path}"): - c = 0 - while c < 10: - try: - with xr.open_dataset(self.nc_path, engine="netcdf4") as ds: - dataset = ds.load() - return dataset - except Exception as e: - warnings.warn( - f"Lock on cache file; sleeping 1s({c}). Error: {e}" - ) - sleep(1) - c += 1 - if c == 10: - error_message = f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}. Deleteing the cache file and recreating from s3" - warnings.warn(error_message) - os.remove(self.nc_path) - # LOG.critical(error_message) - # raise ValueError(error_message) + cached_data = self.load_cache() + if cached_data is not None: + return cached_data try: with self.timing_block(f"lazy loading {self.dataset_name} data"): - return self.slice_ds(self.s3_lazy_ds[self.current_time.year]).rename( - {self.x_label: "x", self.y_label: "y"} + return ( + self.slice_ds(self.s3_lazy_ds[self.current_time.year]) + .rename({self.x_label: "x", self.y_label: "y"}) + .load() ) except Exception as e: error_message = f"Error opening {self.dataset_name} data from {self.url(self.current_time.year)}: {e}\n" @@ -495,8 +489,10 @@ def sliced_ds(self) -> xr.Dataset: s3 = s3fs.S3FileSystem() with s3.open(self.url(date)) as f: ds = xr.open_dataset(f, engine="h5netcdf") + dataset = ds.load() + ds.close() datasets.append( - self.slice_ds(ds, date, date + np.timedelta64(1, "h")) + self.slice_ds(dataset, date, date + np.timedelta64(1, "h")) ) except Exception as e: LOG.critical( @@ -646,33 +642,20 @@ def sliced_ds(self) -> xr.Dataset: :return: xarray Dataset :raises Exception: If zarr open fails """ + cached_data = self.load_cache() + if cached_data is not None: + return cached_data try: - if os.path.exists(self.nc_path): - with self.timing_block(f"opening local dataset {self.nc_path}"): - c = 0 - while c < 10: - try: - return xr.open_dataset(self.nc_path) - except Exception as e: - warnings.warn( - f"Lock on cache file; sleeping 1s({c}). Error: {e}" - ) - sleep(1) - c += 1 - if c == 10: - raise ValueError( - f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}" - ) - else: - with self.timing_block(f"lazy loading {self.dataset_name} data"): - return self.slice_ds(self.s3_lazy_ds).rename( - {self.x_label: "x", self.y_label: "y"} - ) + with self.timing_block(f"lazy loading {self.dataset_name} data"): + return ( + self.slice_ds(self.s3_lazy_ds) + .rename({self.x_label: "x", self.y_label: "y"}) + .load() + ) except Exception as e: - LOG.critical( - f"Error opening {self.dataset_name} data from {self.url}: {e}\n" - ) - raise e + error_message = f"Error opening {self.dataset_name} data from {self.url(self.current_time.year)}: {e}\n" + LOG.critical(error_message) + raise ValueError(error_message) @cached_property def s3_lazy_ds(self) -> xr.Dataset: @@ -713,33 +696,20 @@ def sliced_ds(self) -> xr.Dataset: :return: xarray Dataset :raises Exception: If zarr open fails """ + cached_data = self.load_cache() + if cached_data is not None: + return cached_data try: - if os.path.exists(self.nc_path): - with self.timing_block(f"opening local dataset {self.nc_path}"): - c = 0 - while c < 10: - try: - return xr.open_dataset(self.nc_path) - except Exception as e: - warnings.warn( - f"Lock on cache file; sleeping 1s({c}). Error: {e}" - ) - sleep(1) - c += 1 - if c == 10: - raise ValueError( - f"Exceeded number of attempts (10) to read local cache file for historical forcing data. File: {self.nc_path}" - ) - else: - with self.timing_block(f"lazy loading {self.dataset_name} data"): - return self.slice_ds(self.s3_lazy_ds).rename( - {self.x_label: "x", self.y_label: "y"} - ) + with self.timing_block(f"lazy loading {self.dataset_name} data"): + return ( + self.slice_ds(self.s3_lazy_ds) + .rename({self.x_label: "x", self.y_label: "y"}) + .load() + ) except Exception as e: - LOG.critical( - f"Error opening {self.dataset_name} data from {self.url}: {e}\n" - ) - raise e + error_message = f"Error opening {self.dataset_name} data from {self.url(self.current_time.year)}: {e}\n" + LOG.critical(error_message) + raise ValueError(error_message) @cached_property def src_crs(self) -> CRS: From 07306c8c42854b26808d88a5e16364213a75482d Mon Sep 17 00:00:00 2001 From: Max Kipp Date: Fri, 3 Apr 2026 13:57:36 -0400 Subject: [PATCH 10/10] Forcing cache file: write to random tmp file name then rename --- .../NextGen_Forcings_Engine/general_utils.py | 12 ++++++++++++ .../NextGen_Forcings_Engine/historical_forcing.py | 10 +++++++++- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/general_utils.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/general_utils.py index 953af214..93692dc2 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/general_utils.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/general_utils.py @@ -3,6 +3,7 @@ import json import logging import typing +import uuid import numpy as np @@ -168,3 +169,14 @@ def assert_equal_with_tol( if errors: raise ExpectVsActualError(errors) + + +def rand_str(length: int) -> str: + """Build and return a random string of length up to 32. + Note that if this is called by different MPI ranks, each rank will receive a different random string. + The string is not broadcasted.""" + if not (0 < length <= 32): + raise ValueError( + f"length requested was {length}, but this function only supports length 1 through 32" + ) + return str(uuid.uuid4()).replace("-", "")[:length] diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py index 949dbd61..4c7491f1 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/historical_forcing.py @@ -23,6 +23,7 @@ from pyproj import CRS from zarr.storage import ObjectStore +from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.general_utils import rand_str from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.config import ( ConfigOptions, ) @@ -249,10 +250,17 @@ def compute_ds(self) -> xr.Dataset: ds = self.mpi_config.comm.bcast(ds, root=0) if self.mpi_config.rank == 0: if not os.path.exists(self.nc_path): + tmp_file = ( + f"{self.nc_path}.{rand_str(12)}{os.path.splitext(self.nc_path)[1]}" + ) c = 0 while c < 10: + LOG.info(f"Writing tmp file: {tmp_file}") try: - ds.to_netcdf(self.nc_path, "w") + ds.to_netcdf(tmp_file, "w") + LOG.info(f"Renaming: {tmp_file} -> {self.nc_path}") + os.replace(tmp_file, self.nc_path) + LOG.info(f"Renamed: {tmp_file} -> {self.nc_path}") break except Exception as e: LOG.warning(