diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/bmi_model.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/bmi_model.py index 6858056a..2d287521 100755 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/bmi_model.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/bmi_model.py @@ -24,15 +24,17 @@ from mpi4py import MPI from NextGen_Forcings_Engine_BMI import esmf_creation, forcing_extraction +from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.config import ( + ConfigOptions, +) +from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.consts import GEOGRID +from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.parallel import MpiConfig from .bmi_grid import Grid, GridType from .core import ( - config, err_handler, forcingInputMod, - geoMod, ioMod, - parallel, suppPrecipMod, ) from .model import NWMv3ForcingEngineModel @@ -207,7 +209,7 @@ def initialize(self, config_file: str, output_path: str | None = None) -> None: # If _job_meta was not set by initialize_with_params(), create a default one if self._job_meta is None: - self._job_meta = config.ConfigOptions(self.cfg_bmi) + self._job_meta = ConfigOptions(self.cfg_bmi) # Parse the configuration options try: @@ -231,7 +233,7 @@ def initialize(self, config_file: str, output_path: str | None = None) -> None: self._job_meta.nwmConfig = self.cfg_bmi["NWM_CONFIG"] # Initialize MPI communication - self._mpi_meta = parallel.MpiConfig() + self._mpi_meta = MpiConfig() try: comm = MPI.Comm.f2py(self._comm) if self._comm is not None else None self._mpi_meta.initialize_comm(self._job_meta, comm=comm) @@ -252,24 +254,14 @@ def initialize(self, config_file: str, output_path: str | None = None) -> None: # information about the modeling domain, local processor # grid boundaries, and ESMF grid objects/fields to be used # in regridding. - self._wrf_hydro_geo_meta = geoMod.GeoMetaWrfHydro() - - if self._job_meta.grid_type == "gridded": - self._wrf_hydro_geo_meta.initialize_destination_geo_gridded( - self._job_meta, self._mpi_meta - ) - elif self._job_meta.grid_type == "unstructured": - self._wrf_hydro_geo_meta.initialize_destination_geo_unstructured( - self._job_meta, self._mpi_meta - ) - elif self._job_meta.grid_type == "hydrofabric": - self._wrf_hydro_geo_meta.initialize_destination_geo_hydrofabric( - self._job_meta, self._mpi_meta - ) - else: - self._job_meta.errMsg = "You must specify a proper grid_type (gridded, unstructured, hydrofabric) in the config." + if self._job_meta.grid_type not in GEOGRID: + self._job_meta.errMsg = f"Invalid grid type specified: {self._job_meta.grid_type}. Valid options are: {list(GEOGRID.keys())}" err_handler.err_out_screen_para(self._job_meta.errMsg, self._mpi_meta) + self._wrf_hydro_geo_meta = GEOGRID.get(self._job_meta.grid_type)( + self._job_meta, self._mpi_meta + ) + # Assign grid type to BMI class for grid information self._grid_type = self._job_meta.grid_type.lower() @@ -761,9 +753,7 @@ def initialize(self, config_file: str, output_path: str | None = None) -> None: if self._job_meta.spatial_meta is not None: try: - self._wrf_hydro_geo_meta.initialize_geospatial_metadata( - self._job_meta, self._mpi_meta - ) + self._wrf_hydro_geo_meta.initialize_geospatial_metadata() except Exception as e: err_handler.err_out_screen_para(self._job_meta.errMsg, self._mpi_meta) err_handler.check_program_status(self._job_meta, self._mpi_meta) @@ -897,9 +887,7 @@ def initialize_with_params( :raises ValueError: If an invalid grid type is specified, an exception is raised. """ # Set the job metadata parameters (b_date, geogrid) using config_options - self._job_meta = config.ConfigOptions( - self.cfg_bmi, b_date=b_date, geogrid_arg=geogrid - ) + self._job_meta = ConfigOptions(self.cfg_bmi, b_date=b_date, geogrid_arg=geogrid) # Now that _job_meta is set, call initialize() to set up the core model self.initialize(config_file, output_path=output_path) diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/consts.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/consts.py new file mode 100644 index 00000000..05759781 --- /dev/null +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/consts.py @@ -0,0 +1,11 @@ +from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.geoMod import ( + GriddedGeoMeta, + HydrofabricGeoMeta, + UnstructuredGeoMeta, +) + +GEOGRID = { + "gridded": GriddedGeoMeta, + "unstructured": UnstructuredGeoMeta, + "hydrofabric": HydrofabricGeoMeta, +} diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/forcingInputMod.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/forcingInputMod.py index 9931100b..5e94f65d 100755 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/forcingInputMod.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/forcingInputMod.py @@ -12,7 +12,7 @@ ConfigOptions, ) from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.geoMod import ( - GeoMetaWrfHydro, + GeoMeta, ) from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.parallel import MpiConfig from nextgen_forcings_ewts import MODULE_NAME @@ -963,7 +963,7 @@ def regrid_map(self): def regrid_inputs( self, config_options: ConfigOptions, - wrf_hyro_geo_meta: GeoMetaWrfHydro, + wrf_hyro_geo_meta: GeoMeta, mpi_config: MpiConfig, ): """Regrid input forcings to the final output grids for this timestep. @@ -1012,7 +1012,7 @@ def temporal_interpolate_inputs( def init_dict( config_options: ConfigOptions, - geo_meta_wrf_hydro: GeoMetaWrfHydro, + geo_meta_wrf_hydro: GeoMeta, mpi_config: MpiConfig, ) -> dict: """Initialize the input forcing dictionary. diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/geoMod.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/geoMod.py index 8902ebbe..e5b0995c 100755 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/geoMod.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/geoMod.py @@ -1,5 +1,4 @@ import math -from time import time import netCDF4 import numpy as np @@ -19,30 +18,28 @@ import logging +from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.config import ( + ConfigOptions, +) +from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.parallel import MpiConfig from nextgen_forcings_ewts import MODULE_NAME LOG = logging.getLogger(MODULE_NAME) -class GeoMetaWrfHydro: - """Abstract class for handling information about the WRF-Hydro domain we are processing forcings too.""" +class GeoMeta: + """Abstract class for handling information about the WRF-Hydro domain we are processing forcings to.""" - def __init__(self): - """Initialize GeoMetaWrfHydro class variables.""" + def __init__(self, config_options: ConfigOptions, mpi_config: MpiConfig): + """Initialize GeoMeta class variables.""" + self.config_options = config_options + self.mpi_config = mpi_config self.nx_global = None self.ny_global = None self.nx_global_elem = None self.ny_global_elem = None self.dx_meters = None self.dy_meters = None - self.nx_local = None - self.ny_local = None - self.nx_local_elem = None - self.ny_local_elem = None - self.x_lower_bound = None - self.x_upper_bound = None - self.y_lower_bound = None - self.y_upper_bound = None self.latitude_grid = None self.longitude_grid = None self.element_ids = None @@ -74,40 +71,268 @@ def __init__(self): self.y_coords = None self.spatial_global_atts = None - def get_processor_bounds(self, config_options): - """Calculate the local grid boundaries for this processor. + def initialize_geospatial_metadata(self): + """Initialize GeoMetaWrfHydro class variables. - ESMF operates under the hood and the boundary values - are calculated within the ESMF software. + Function that will read in crs/x/y geospatial metadata and coordinates + from the optional geospatial metadata file IF it was specified by the user in + the configuration file. :return: """ - if config_options.grid_type == "gridded": - self.x_lower_bound = self.esmf_grid.lower_bounds[ESMF.StaggerLoc.CENTER][1] - self.x_upper_bound = self.esmf_grid.upper_bounds[ESMF.StaggerLoc.CENTER][1] - self.y_lower_bound = self.esmf_grid.lower_bounds[ESMF.StaggerLoc.CENTER][0] - self.y_upper_bound = self.esmf_grid.upper_bounds[ESMF.StaggerLoc.CENTER][0] - self.nx_local = self.x_upper_bound - self.x_lower_bound - self.ny_local = self.y_upper_bound - self.y_lower_bound - elif config_options.grid_type == "unstructured": - self.nx_local = len(self.esmf_grid.coords[0][1]) - self.ny_local = len(self.esmf_grid.coords[0][1]) - self.nx_local_elem = len(self.esmf_grid.coords[1][1]) - self.ny_local_elem = len(self.esmf_grid.coords[1][1]) - # LOG.debug("ESMF Mesh nx local node is " + str(self.nx_local)) - # LOG.debug("ESMF Mesh nx local elem is " + str(self.nx_local_elem)) - elif config_options.grid_type == "hydrofabric": - self.nx_local = len(self.esmf_grid.coords[1][1]) - self.ny_local = len(self.esmf_grid.coords[1][1]) - # self.nx_local_poly = len(self.esmf_poly_coords) - # self.ny_local_poly = len(self.esmf_poly_coords) - # LOG.debug("ESMF Mesh nx local elem is " + str(self.nx_local)) - # LOG.debug("ESMF Mesh nx local poly is " + str(self.nx_local_poly)) - # LOG.debug("WRF-HYDRO LOCAL X BOUND 1 = " + str(self.x_lower_bound)) - # LOG.debug("WRF-HYDRO LOCAL X BOUND 2 = " + str(self.x_upper_bound)) - # LOG.debug("WRF-HYDRO LOCAL Y BOUND 1 = " + str(self.y_lower_bound)) - # LOG.debug("WRF-HYDRO LOCAL Y BOUND 2 = " + str(self.y_upper_bound)) - - def initialize_destination_geo_gridded(self, config_options, mpi_config): + # We will only read information on processor 0. This data is not necessary for the + # other processors, and is only used in the output routines. + if self.mpi_config.rank == 0: + # Open the geospatial metadata file. + try: + esmf_nc = netCDF4.Dataset(self.config_options.spatial_meta, "r") + except Exception as e: + self.config_options.errMsg = f"Unable to open spatial metadata file: {self.config_options.spatial_meta}" + raise e + + # Make sure the expected variables are present in the file. + if "crs" not in esmf_nc.variables.keys(): + self.config_options.errMsg = f"Unable to locate crs variable in: {self.config_options.spatial_meta}" + raise Exception + if "x" not in esmf_nc.variables.keys(): + self.config_options.errMsg = f"Unable to locate x variable in: {self.config_options.spatial_meta}" + raise Exception + if "y" not in esmf_nc.variables.keys(): + self.config_options.errMsg = f"Unable to locate y variable in: {self.config_options.spatial_meta}" + raise Exception + # Extract names of variable attributes from each of the input geospatial variables. These + # can change, so we are making this as flexible as possible to accomodate future changes. + try: + crs_att_names = esmf_nc.variables["crs"].ncattrs() + except Exception as e: + self.config_options.errMsg = f"Unable to extract crs attribute names from: {self.config_options.spatial_meta}" + raise e + try: + x_coord_att_names = esmf_nc.variables["x"].ncattrs() + except Exception as e: + self.config_options.errMsg = f"Unable to extract x attribute names from: {self.config_options.spatial_meta}" + raise e + try: + y_coord_att_names = esmf_nc.variables["y"].ncattrs() + except Exception as e: + self.config_options.errMsg = f"Unable to extract y attribute names from: {self.config_options.spatial_meta}" + raise e + # Extract attribute values + try: + self.x_coord_atts = { + item: esmf_nc.variables["x"].getncattr(item) + for item in x_coord_att_names + } + except Exception as e: + self.config_options.errMsg = f"Unable to extract x coordinate attributes from: {self.config_options.spatial_meta}" + raise e + try: + self.y_coord_atts = { + item: esmf_nc.variables["y"].getncattr(item) + for item in y_coord_att_names + } + except Exception as e: + self.config_options.errMsg = f"Unable to extract y coordinate attributes from: {self.config_options.spatial_meta}" + raise e + try: + self.crs_atts = { + item: esmf_nc.variables["crs"].getncattr(item) + for item in crs_att_names + } + except Exception as e: + self.config_options.errMsg = f"Unable to extract crs coordinate attributes from: {self.config_options.spatial_meta}" + raise e + + # Extract global attributes + try: + global_att_names = esmf_nc.ncattrs() + except Exception as e: + self.config_options.errMsg = f"Unable to extract global attribute names from: {self.config_options.spatial_meta}" + raise e + try: + self.spatial_global_atts = { + item: esmf_nc.getncattr(item) for item in global_att_names + } + except Exception as e: + self.config_options.errMsg = f"Unable to extract global attributes from: {self.config_options.spatial_meta}" + raise e + + # Extract x/y coordinate values + if len(esmf_nc.variables["x"].shape) == 1: + try: + self.x_coords = esmf_nc.variables["x"][:].data + except Exception as e: + self.config_options.errMsg = f"Unable to extract x coordinate values from: {self.config_options.spatial_meta}" + raise e + try: + self.y_coords = esmf_nc.variables["y"][:].data + except Exception as e: + self.config_options.errMsg = f"Unable to extract y coordinate values from: {self.config_options.spatial_meta}" + raise e + # Check to see if the Y coordinates are North-South. If so, flip them. + if self.y_coords[1] < self.y_coords[0]: + self.y_coords[:] = np.flip(self.y_coords[:], axis=0) + + if len(esmf_nc.variables["x"].shape) == 2: + try: + self.x_coords = esmf_nc.variables["x"][:, :].data + except Exception as e: + self.config_options.errMsg = f"Unable to extract x coordinate values from: {self.config_options.spatial_meta}" + raise e + try: + self.y_coords = esmf_nc.variables["y"][:, :].data + except Exception as e: + self.config_options.errMsg = f"Unable to extract y coordinate values from: {self.config_options.spatial_meta}" + raise e + # Check to see if the Y coordinates are North-South. If so, flip them. + if self.y_coords[1, 0] > self.y_coords[0, 0]: + self.y_coords[:, :] = np.flipud(self.y_coords[:, :]) + + # Close the geospatial metadata file. + try: + esmf_nc.close() + except Exception as e: + self.config_options.errMsg = f"Unable to close spatial metadata file: {self.config_options.spatial_meta}" + raise e + + # mpi_config.comm.barrier() + + def calc_slope(self, esmf_nc: netCDF4.Dataset) -> tuple: + """Calculate slope grids needed for incoming shortwave radiation downscaling. + + Function to calculate slope grids needed for incoming shortwave radiation downscaling + later during the program. + :param esmf_nc: The open netCDF4 dataset for the geogrid file, passed in to avoid having to reopen the file multiple times + :return: A tuple containing slope and slope azimuth for nodes and elements + """ + # First extract the sina,cosa, and elevation variables from the geogrid file. + try: + sina_grid = esmf_nc.variables[self.config_options.sinalpha_var][0, :, :] + except Exception as e: + self.config_options.errMsg = ( + f"Unable to extract SINALPHA from: {self.config_options.geogrid}" + ) + raise e + + try: + cosa_grid = esmf_nc.variables[self.config_options.cosalpha_var][0, :, :] + except Exception as e: + self.config_options.errMsg = ( + f"Unable to extract COSALPHA from: {self.config_options.geogrid}" + ) + raise e + + try: + height_dest = esmf_nc.variables[self.config_options.hgt_var][0, :, :] + except Exception as e: + self.config_options.errMsg = ( + f"Unable to extract HGT_M from: {self.config_options.geogrid}" + ) + raise e + + # Ensure cosa/sina are correct dimensions + if sina_grid.shape[0] != self.ny_global or sina_grid.shape[1] != self.nx_global: + self.config_options.errMsg = ( + f"SINALPHA dimensions mismatch in: {self.config_options.geogrid}" + ) + raise Exception + if cosa_grid.shape[0] != self.ny_global or cosa_grid.shape[1] != self.nx_global: + self.config_options.errMsg = ( + f"COSALPHA dimensions mismatch in: {self.config_options.geogrid}" + ) + raise Exception + if ( + height_dest.shape[0] != self.ny_global + or height_dest.shape[1] != self.nx_global + ): + self.config_options.errMsg = ( + f"HGT_M dimension mismatch in: {self.config_options.geogrid}" + ) + raise Exception + + # Establish constants + rdx = 1.0 / self.dx_meters + rdy = 1.0 / self.dy_meters + msftx = 1.0 + msfty = 1.0 + + slope_out = np.empty([self.ny_global, self.nx_global], np.float32) + toposlpx = np.empty([self.ny_global, self.nx_global], np.float32) + toposlpy = np.empty([self.ny_global, self.nx_global], np.float32) + slp_azi = np.empty([self.ny_global, self.nx_global], np.float32) + ip_diff = np.empty([self.ny_global, self.nx_global], np.int32) + jp_diff = np.empty([self.ny_global, self.nx_global], np.int32) + hx = np.empty([self.ny_global, self.nx_global], np.float32) + hy = np.empty([self.ny_global, self.nx_global], np.float32) + + # Create index arrays that will be used to calculate slope. + x_tmp = np.arange(self.nx_global) + y_tmp = np.arange(self.ny_global) + x_grid = np.tile(x_tmp[:], (self.ny_global, 1)) + y_grid = np.repeat(y_tmp[:, np.newaxis], self.nx_global, axis=1) + ind_orig = np.where(height_dest == height_dest) + ind_ip1 = ((ind_orig[0]), (ind_orig[1] + 1)) + ind_im1 = ((ind_orig[0]), (ind_orig[1] - 1)) + ind_jp1 = ((ind_orig[0] + 1), (ind_orig[1])) + ind_jm1 = ((ind_orig[0] - 1), (ind_orig[1])) + ind_ip1[1][np.where(ind_ip1[1] >= self.nx_global)] = self.nx_global - 1 + ind_jp1[0][np.where(ind_jp1[0] >= self.ny_global)] = self.ny_global - 1 + ind_im1[1][np.where(ind_im1[1] < 0)] = 0 + ind_jm1[0][np.where(ind_jm1[0] < 0)] = 0 + + ip_diff[ind_orig] = x_grid[ind_ip1] - x_grid[ind_im1] + jp_diff[ind_orig] = y_grid[ind_jp1] - y_grid[ind_jm1] + + toposlpx[ind_orig] = ( + (height_dest[ind_ip1] - height_dest[ind_im1]) * msftx * rdx + ) / ip_diff[ind_orig] + toposlpy[ind_orig] = ( + (height_dest[ind_jp1] - height_dest[ind_jm1]) * msfty * rdy + ) / jp_diff[ind_orig] + hx[ind_orig] = toposlpx[ind_orig] + hy[ind_orig] = toposlpy[ind_orig] + slope_out[ind_orig] = np.arctan((hx[ind_orig] ** 2 + hy[ind_orig] ** 2) ** 0.5) + slope_out[np.where(slope_out < 1e-4)] = 0.0 + slp_azi[np.where(slope_out < 1e-4)] = 0.0 + ind_valesmf_nc = np.where(slope_out >= 1e-4) + slp_azi[ind_valesmf_nc] = ( + np.arctan2(hx[ind_valesmf_nc], hy[ind_valesmf_nc]) + math.pi + ) + ind_valesmf_nc = np.where(cosa_grid >= 0.0) + slp_azi[ind_valesmf_nc] = slp_azi[ind_valesmf_nc] - np.arcsin( + sina_grid[ind_valesmf_nc] + ) + ind_valesmf_nc = np.where(cosa_grid < 0.0) + slp_azi[ind_valesmf_nc] = slp_azi[ind_valesmf_nc] - ( + math.pi - np.arcsin(sina_grid[ind_valesmf_nc]) + ) + + # Reset temporary arrays to None to free up memory + toposlpx = None + toposlpy = None + height_dest = None + sina_grid = None + cosa_grid = None + ind_valesmf_nc = None + x_tmp = None + y_tmp = None + x_grid = None + ip_diff = None + jp_diff = None + ind_orig = None + ind_jm1 = None + ind_jp1 = None + ind_im1 = None + ind_ip1 = None + hx = None + hy = None + + return slope_out, slp_azi + + +class GriddedGeoMeta(GeoMeta): + """Class for handling information about the gridded domain we are processing forcings to.""" + + def __init__(self, config_options: ConfigOptions, mpi_config: MpiConfig): """Initialize GeoMetaWrfHydro class variables. Initialization function to initialize ESMF through ESMPy, @@ -116,126 +341,106 @@ def initialize_destination_geo_gridded(self, config_options, mpi_config): for this particular processor. :return: """ + super().__init__(config_options, mpi_config) + self.nx_local_elem = None + self.ny_local_elem = None # Open the geogrid file and extract necessary information # to create ESMF fields. if mpi_config.rank == 0: try: - idTmp = netCDF4.Dataset(config_options.geogrid, "r") + esmf_nc = netCDF4.Dataset(self.config_options.geogrid, "r") except Exception as e: - config_options.errMsg = ( - "Unable to open the WRF-Hydro geogrid file: " - + config_options.geogrid - ) - raise Exception - if idTmp.variables[config_options.lat_var].ndim == 3: + self.config_options.errMsg = f"Unable to open the WRF-Hydro geogrid file: {self.config_options.geogrid}" + raise e + if esmf_nc.variables[self.config_options.lat_var].ndim == 3: try: - self.nx_global = idTmp.variables[config_options.lat_var].shape[2] + self.nx_global = esmf_nc.variables[ + self.config_options.lat_var + ].shape[2] except Exception as e: - config_options.errMsg = ( - "Unable to extract X dimension size from latitude variable in: " - + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to extract X dimension size from latitude variable in: {self.config_options.geogrid}" + raise e try: - self.ny_global = idTmp.variables[config_options.lat_var].shape[1] + self.ny_global = esmf_nc.variables[ + self.config_options.lat_var + ].shape[1] except Exception as e: - config_options.errMsg = ( - "Unable to extract Y dimension size from latitude in: " - + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to extract Y dimension size from latitude in: {self.config_options.geogrid}" + raise e try: - self.dx_meters = idTmp.DX + self.dx_meters = esmf_nc.DX except Exception as e: - config_options.errMsg = ( - "Unable to extract DX global attribute in: " - + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to extract DX global attribute in: {self.config_options.geogrid}" + raise e try: - self.dy_meters = idTmp.DY + self.dy_meters = esmf_nc.DY except Exception as e: - config_options.errMsg = ( - "Unable to extract DY global attribute in: " - + config_options.geogrid - ) - raise Exception - elif idTmp.variables[config_options.lat_var].ndim == 2: + self.config_options.errMsg = f"Unable to extract DY global attribute in: {self.config_options.geogrid}" + raise e + elif esmf_nc.variables[self.config_options.lat_var].ndim == 2: try: - self.nx_global = idTmp.variables[config_options.lat_var].shape[1] + self.nx_global = esmf_nc.variables[ + self.config_options.lat_var + ].shape[1] except Exception as e: - config_options.errMsg = ( - "Unable to extract X dimension size from latitude variable in: " - + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to extract X dimension size from latitude variable in: {self.config_options.geogrid}" + raise e try: - self.ny_global = idTmp.variables[config_options.lat_var].shape[0] + self.ny_global = esmf_nc.variables[ + self.config_options.lat_var + ].shape[0] except Exception as e: - config_options.errMsg = ( - "Unable to extract Y dimension size from latitude in: " - + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to extract Y dimension size from latitude in: {self.config_options.geogrid}" + raise e try: - self.dx_meters = idTmp.variables[config_options.lon_var].dx + self.dx_meters = esmf_nc.variables[self.config_options.lon_var].dx except Exception as e: - config_options.errMsg = ( - "Unable to extract DX global attribute in: " - + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to extract DX global attribute in: {self.config_options.geogrid}" + raise e try: - self.dy_meters = idTmp.variables[config_options.lat_var].dy + self.dy_meters = esmf_nc.variables[self.config_options.lat_var].dy except Exception as e: - config_options.errMsg = ( - "Unable to extract DY global attribute in: " - + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to extract DY global attribute in: {self.config_options.geogrid}" + raise e else: try: - self.nx_global = idTmp.variables[config_options.lon_var].shape[0] + self.nx_global = esmf_nc.variables[ + self.config_options.lon_var + ].shape[0] except Exception as e: - config_options.errMsg = ( - "Unable to extract X dimension size from longitude variable in: " - + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to extract X dimension size from longitude variable in: {self.config_options.geogrid}" + raise e try: - self.ny_global = idTmp.variables[config_options.lat_var].shape[0] + self.ny_global = esmf_nc.variables[ + self.config_options.lat_var + ].shape[0] except Exception as e: - config_options.errMsg = ( - "Unable to extract Y dimension size from latitude in: " - + config_options.geogrid - ) - raise Exception - if config_options.input_forcings[0] != 23: + self.config_options.errMsg = f"Unable to extract Y dimension size from latitude in: {self.config_options.geogrid}" + raise e + if self.config_options.input_forcings[0] != 23: try: - self.dx_meters = idTmp.variables[config_options.lon_var].dx + self.dx_meters = esmf_nc.variables[ + self.config_options.lon_var + ].dx except Exception as e: - config_options.errMsg = ( - "Unable to extract dx metadata attribute in: " - + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to extract dx metadata attribute in: {self.config_options.geogrid}" + raise e try: - self.dy_meters = idTmp.variables[config_options.lat_var].dy + self.dy_meters = esmf_nc.variables[ + self.config_options.lat_var + ].dy except Exception as e: - config_options.errMsg = ( - "Unable to extract dy metadata attribute in: " - + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to extract dy metadata attribute in: {self.config_options.geogrid}" + raise e else: # Manually input the grid spacing since ERA5-Interim does not # internally have this geospatial information within the netcdf file @@ -246,16 +451,16 @@ def initialize_destination_geo_gridded(self, config_options, mpi_config): # Broadcast global dimensions to the other processors. self.nx_global = mpi_config.broadcast_parameter( - self.nx_global, config_options, param_type=int + self.nx_global, self.config_options, param_type=int ) self.ny_global = mpi_config.broadcast_parameter( - self.ny_global, config_options, param_type=int + self.ny_global, self.config_options, param_type=int ) self.dx_meters = mpi_config.broadcast_parameter( - self.dx_meters, config_options, param_type=float + self.dx_meters, self.config_options, param_type=float ) self.dy_meters = mpi_config.broadcast_parameter( - self.dy_meters, config_options, param_type=float + self.dy_meters, self.config_options, param_type=float ) # mpi_config.comm.barrier() @@ -267,11 +472,8 @@ def initialize_destination_geo_gridded(self, config_options, mpi_config): coord_sys=ESMF.CoordSys.SPH_DEG, ) except Exception as e: - config_options.errMsg = ( - "Unable to create ESMF grid for WRF-Hydro geogrid: " - + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to create ESMF grid for WRF-Hydro geogrid: {self.config_options.geogrid}" + raise e # mpi_config.comm.barrier() @@ -280,605 +482,512 @@ def initialize_destination_geo_gridded(self, config_options, mpi_config): # mpi_config.comm.barrier() - # Obtain the local boundaries for this processor. - self.get_processor_bounds(config_options) - # Scatter global XLAT_M grid to processors.. if mpi_config.rank == 0: - if idTmp.variables[config_options.lat_var].ndim == 3: - varTmp = idTmp.variables[config_options.lat_var][0, :, :] - elif idTmp.variables[config_options.lat_var].ndim == 2: - varTmp = idTmp.variables[config_options.lat_var][:, :] - elif idTmp.variables[config_options.lat_var].ndim == 1: - lat = idTmp.variables[config_options.lat_var][:] - lon = idTmp.variables[config_options.lon_var][:] - varTmp = np.meshgrid(lon, lat)[1] + if esmf_nc.variables[self.config_options.lat_var].ndim == 3: + var_tmp = esmf_nc.variables[self.config_options.lat_var][0, :, :] + elif esmf_nc.variables[self.config_options.lat_var].ndim == 2: + var_tmp = esmf_nc.variables[self.config_options.lat_var][:, :] + elif esmf_nc.variables[self.config_options.lat_var].ndim == 1: + lat = esmf_nc.variables[self.config_options.lat_var][:] + lon = esmf_nc.variables[self.config_options.lon_var][:] + var_tmp = np.meshgrid(lon, lat)[1] lat = None lon = None # Flag to grab entire array for AWS slicing - if config_options.aws: - self.lat_bounds = varTmp + if self.config_options.aws: + self.lat_bounds = var_tmp else: - varTmp = None + var_tmp = None # mpi_config.comm.barrier() - varSubTmp = mpi_config.scatter_array(self, varTmp, config_options) + var_sub_tmp = mpi_config.scatter_array(self, var_tmp, self.config_options) # mpi_config.comm.barrier() # Place the local lat/lon grid slices from the parent geogrid file into # the ESMF lat/lon grids. try: - self.esmf_lat[:, :] = varSubTmp - self.latitude_grid = varSubTmp - varSubTmp = None - varTmp = None + self.esmf_lat[:, :] = var_sub_tmp + self.latitude_grid = var_sub_tmp + var_sub_tmp = None + var_tmp = None except Exception as e: - config_options.errMsg = ( + self.config_options.errMsg = ( "Unable to subset latitude from geogrid file into ESMF object" ) - raise Exception + raise e # mpi_config.comm.barrier() # Scatter global XLONG_M grid to processors.. if mpi_config.rank == 0: - if idTmp.variables[config_options.lat_var].ndim == 3: - varTmp = idTmp.variables[config_options.lon_var][0, :, :] - elif idTmp.variables[config_options.lon_var].ndim == 2: - varTmp = idTmp.variables[config_options.lon_var][:, :] - elif idTmp.variables[config_options.lon_var].ndim == 1: - lat = idTmp.variables[config_options.lat_var][:] - lon = idTmp.variables[config_options.lon_var][:] - varTmp = np.meshgrid(lon, lat)[0] + if esmf_nc.variables[self.config_options.lat_var].ndim == 3: + var_tmp = esmf_nc.variables[self.config_options.lon_var][0, :, :] + elif esmf_nc.variables[self.config_options.lon_var].ndim == 2: + var_tmp = esmf_nc.variables[self.config_options.lon_var][:, :] + elif esmf_nc.variables[self.config_options.lon_var].ndim == 1: + lat = esmf_nc.variables[self.config_options.lat_var][:] + lon = esmf_nc.variables[self.config_options.lon_var][:] + var_tmp = np.meshgrid(lon, lat)[0] lat = None lon = None # Flag to grab entire array for AWS slicing - if config_options.aws: - self.lon_bounds = varTmp + if self.config_options.aws: + self.lon_bounds = var_tmp else: - varTmp = None + var_tmp = None # mpi_config.comm.barrier() - varSubTmp = mpi_config.scatter_array(self, varTmp, config_options) + var_sub_tmp = mpi_config.scatter_array(self, var_tmp, self.config_options) # mpi_config.comm.barrier() try: - self.esmf_lon[:, :] = varSubTmp - self.longitude_grid = varSubTmp - varSubTmp = None - varTmp = None + self.esmf_lon[:, :] = var_sub_tmp + self.longitude_grid = var_sub_tmp + var_sub_tmp = None + var_tmp = None except Exception as e: - config_options.errMsg = ( + self.config_options.errMsg = ( "Unable to subset longitude from geogrid file into ESMF object" ) - raise Exception + raise e # mpi_config.comm.barrier() if ( - config_options.cosalpha_var is not None - and config_options.sinalpha_var is not None + self.config_options.cosalpha_var is not None + and self.config_options.sinalpha_var is not None ): # Scatter the COSALPHA,SINALPHA grids to the processors. if mpi_config.rank == 0: - if idTmp.variables[config_options.cosalpha_var].ndim == 3: - varTmp = idTmp.variables[config_options.cosalpha_var][0, :, :] + if esmf_nc.variables[self.config_options.cosalpha_var].ndim == 3: + var_tmp = esmf_nc.variables[self.config_options.cosalpha_var][ + 0, :, : + ] else: - varTmp = idTmp.variables[config_options.cosalpha_var][:, :] + var_tmp = esmf_nc.variables[self.config_options.cosalpha_var][:, :] else: - varTmp = None + var_tmp = None # mpi_config.comm.barrier() - varSubTmp = mpi_config.scatter_array(self, varTmp, config_options) + var_sub_tmp = mpi_config.scatter_array(self, var_tmp, self.config_options) # mpi_config.comm.barrier() - self.cosa_grid = varSubTmp[:, :] - varSubTmp = None - varTmp = None + self.cosa_grid = var_sub_tmp[:, :] + var_sub_tmp = None + var_tmp = None if mpi_config.rank == 0: - if idTmp.variables[config_options.sinalpha_var].ndim == 3: - varTmp = idTmp.variables[config_options.sinalpha_var][0, :, :] + if esmf_nc.variables[self.config_options.sinalpha_var].ndim == 3: + var_tmp = esmf_nc.variables[self.config_options.sinalpha_var][ + 0, :, : + ] else: - varTmp = idTmp.variables[config_options.sinalpha_var][:, :] + var_tmp = esmf_nc.variables[self.config_options.sinalpha_var][:, :] else: - varTmp = None + var_tmp = None # mpi_config.comm.barrier() - varSubTmp = mpi_config.scatter_array(self, varTmp, config_options) + var_sub_tmp = mpi_config.scatter_array(self, var_tmp, self.config_options) # mpi_config.comm.barrier() - self.sina_grid = varSubTmp[:, :] - varSubTmp = None - varTmp = None + self.sina_grid = var_sub_tmp[:, :] + var_sub_tmp = None + var_tmp = None - if config_options.hgt_var is not None: + if self.config_options.hgt_var is not None: # Read in a scatter the WRF-Hydro elevation, which is used for downscaling # purposes. if mpi_config.rank == 0: - if idTmp.variables[config_options.hgt_var].ndim == 3: - varTmp = idTmp.variables[config_options.hgt_var][0, :, :] + if esmf_nc.variables[self.config_options.hgt_var].ndim == 3: + var_tmp = esmf_nc.variables[self.config_options.hgt_var][0, :, :] else: - varTmp = idTmp.variables[config_options.hgt_var][:, :] + var_tmp = esmf_nc.variables[self.config_options.hgt_var][:, :] else: - varTmp = None + var_tmp = None # mpi_config.comm.barrier() - varSubTmp = mpi_config.scatter_array(self, varTmp, config_options) + var_sub_tmp = mpi_config.scatter_array(self, var_tmp, self.config_options) # mpi_config.comm.barrier() - self.height = varSubTmp - varSubTmp = None - varTmp = None + self.height = var_sub_tmp + var_sub_tmp = None + var_tmp = None if ( - config_options.cosalpha_var is not None - and config_options.sinalpha_var is not None + self.config_options.cosalpha_var is not None + and self.config_options.sinalpha_var is not None ): # Calculate the slope from the domain using elevation on the WRF-Hydro domain. This will # be used for downscaling purposes. if mpi_config.rank == 0: try: - slopeTmp, slp_azi_tmp = self.calc_slope(idTmp, config_options) + slope_tmp, slp_azi_tmp = self.calc_slope(esmf_nc) except Exception: raise Exception else: - slopeTmp = None + slope_tmp = None slp_azi_tmp = None # mpi_config.comm.barrier() - slopeSubTmp = mpi_config.scatter_array(self, slopeTmp, config_options) - self.slope = slopeSubTmp[:, :] - slopeSubTmp = None + slope_sub_tmp = mpi_config.scatter_array( + self, slope_tmp, self.config_options + ) + self.slope = slope_sub_tmp[:, :] + slope_sub_tmp = None - slp_azi_sub = mpi_config.scatter_array(self, slp_azi_tmp, config_options) + slp_azi_sub = mpi_config.scatter_array( + self, slp_azi_tmp, self.config_options + ) self.slp_azi = slp_azi_sub[:, :] slp_azi_tmp = None elif ( - config_options.slope_var is not None - and config_options.slope_azimuth_var is not None + self.config_options.slope_var is not None + and self.config_options.slope_azimuth_var is not None ): if mpi_config.rank == 0: - if idTmp.variables[config_options.slope_var].ndim == 3: - varTmp = idTmp.variables[config_options.slope_var][0, :, :] + if esmf_nc.variables[self.config_options.slope_var].ndim == 3: + var_tmp = esmf_nc.variables[self.config_options.slope_var][0, :, :] else: - varTmp = idTmp.variables[config_options.slope_var][:, :] + var_tmp = esmf_nc.variables[self.config_options.slope_var][:, :] else: - varTmp = None + var_tmp = None - slopeSubTmp = mpi_config.scatter_array(self, varTmp, config_options) - self.slope = slopeSubTmp - varTmp = None + slope_sub_tmp = mpi_config.scatter_array(self, var_tmp, self.config_options) + self.slope = slope_sub_tmp + var_tmp = None if mpi_config.rank == 0: - if idTmp.variables[config_options.slope_azimuth_var].ndim == 3: - varTmp = idTmp.variables[config_options.slope_azimuth_var][0, :, :] + if esmf_nc.variables[self.config_options.slope_azimuth_var].ndim == 3: + var_tmp = esmf_nc.variables[self.config_options.slope_azimuth_var][ + 0, :, : + ] else: - varTmp = idTmp.variables[config_options.slope_azimuth_var][:, :] + var_tmp = esmf_nc.variables[self.config_options.slope_azimuth_var][ + :, : + ] else: - varTmp = None + var_tmp = None - slp_azi_sub = mpi_config.scatter_array(self, varTmp, config_options) + slp_azi_sub = mpi_config.scatter_array(self, var_tmp, self.config_options) self.slp_azi = slp_azi_sub[:, :] - varTmp = None + var_tmp = None - elif config_options.hgt_var is not None: + elif self.config_options.hgt_var is not None: # Calculate the slope from the domain using elevation of the gridded model and other approximations if mpi_config.rank == 0: try: - slopeTmp, slp_azi_tmp = self.calc_slope_gridded( - idTmp, config_options - ) + slope_tmp, slp_azi_tmp = self.calc_slope_gridded(esmf_nc) except Exception: raise Exception else: - slopeTmp = None + slope_tmp = None slp_azi_tmp = None # mpi_config.comm.barrier() - slopeSubTmp = mpi_config.scatter_array(self, slopeTmp, config_options) - self.slope = slopeSubTmp[:, :] - slopeSubTmp = None + slope_sub_tmp = mpi_config.scatter_array( + self, slope_tmp, self.config_options + ) + self.slope = slope_sub_tmp[:, :] + slope_sub_tmp = None - slp_azi_sub = mpi_config.scatter_array(self, slp_azi_tmp, config_options) + slp_azi_sub = mpi_config.scatter_array( + self, slp_azi_tmp, self.config_options + ) self.slp_azi = slp_azi_sub[:, :] slp_azi_tmp = None if mpi_config.rank == 0: # Close the geogrid file try: - idTmp.close() + esmf_nc.close() except Exception as e: - config_options.errMsg = ( - "Unable to close geogrid file: " + config_options.geogrid + self.config_options.errMsg = ( + f"Unable to close geogrid file: {self.config_options.geogrid}" ) - raise Exception + raise e # Reset temporary variables to free up memory - slopeTmp = None + slope_tmp = None slp_azi_tmp = None - varTmp = None + var_tmp = None - def initialize_geospatial_metadata(self, config_options, mpi_config): - """Initialize GeoMetaWrfHydro class variables. + def calc_slope_gridded(self, esmf_nc: netCDF4.Dataset) -> tuple: + """Calculate slope grids needed for incoming shortwave radiation downscaling. - Function that will read in crs/x/y geospatial metadata and coordinates - from the optional geospatial metadata file IF it was specified by the user in - the configuration file. - :param config_options: - :return: + Function to calculate slope grids needed for incoming shortwave radiation downscaling + later during the program. This calculates the slopes for grid cells + :param esmf_nc: The open netCDF4 dataset for the geogrid file, passed in to avoid having to reopen the file multiple times + :return: A tuple containing slope and slope azimuth for grid cells """ - # We will only read information on processor 0. This data is not necessary for the - # other processors, and is only used in the output routines. - if mpi_config.rank == 0: - # Open the geospatial metadata file. - try: - idTmp = netCDF4.Dataset(config_options.spatial_meta, "r") - except Exception as e: - config_options.errMsg = ( - "Unable to open spatial metadata file: " - + config_options.spatial_meta - ) - raise Exception + esmf_nc = netCDF4.Dataset(self.config_options.geogrid, "r") - # Make sure the expected variables are present in the file. - if "crs" not in idTmp.variables.keys(): - config_options.errMsg = ( - "Unable to locate crs variable in: " + config_options.spatial_meta - ) - raise Exception - if "x" not in idTmp.variables.keys(): - config_options.errMsg = ( - "Unable to locate x variable in: " + config_options.spatial_meta - ) - raise Exception - if "y" not in idTmp.variables.keys(): - config_options.errMsg = ( - "Unable to locate y variable in: " + config_options.spatial_meta - ) - raise Exception - # Extract names of variable attributes from each of the input geospatial variables. These - # can change, so we are making this as flexible as possible to accomodate future changes. - try: - crs_att_names = idTmp.variables["crs"].ncattrs() - except Exception as e: - config_options.errMsg = ( - "Unable to extract crs attribute names from: " - + config_options.spatial_meta - ) - raise Exception - try: - x_coord_att_names = idTmp.variables["x"].ncattrs() - except Exception as e: - config_options.errMsg = ( - "Unable to extract x attribute names from: " - + config_options.spatial_meta - ) - raise Exception - try: - y_coord_att_names = idTmp.variables["y"].ncattrs() - except Exception as e: - config_options.errMsg = ( - "Unable to extract y attribute names from: " - + config_options.spatial_meta - ) - raise Exception - # Extract attribute values - try: - self.x_coord_atts = { - item: idTmp.variables["x"].getncattr(item) - for item in x_coord_att_names - } - except Exception as e: - config_options.errMsg = ( - "Unable to extract x coordinate attributes from: " - + config_options.spatial_meta - ) - raise Exception - try: - self.y_coord_atts = { - item: idTmp.variables["y"].getncattr(item) - for item in y_coord_att_names - } - except Exception as e: - config_options.errMsg = ( - "Unable to extract y coordinate attributes from: " - + config_options.spatial_meta - ) - raise Exception - try: - self.crs_atts = { - item: idTmp.variables["crs"].getncattr(item) - for item in crs_att_names - } - except Exception as e: - config_options.errMsg = ( - "Unable to extract crs coordinate attributes from: " - + config_options.spatial_meta - ) - raise Exception + try: + lons = esmf_nc.variables[self.config_options.lon_var][:] + lats = esmf_nc.variables[self.config_options.lat_var][:] + except Exception as e: + self.config_options.errMsg = f"Unable to extract gridded coordinates in {self.config_options.geogrid}" + raise e + try: + dx = np.empty( + ( + esmf_nc.variables[self.config_options.lat_var].shape[0], + esmf_nc.variables[self.config_options.lon_var].shape[0], + ), + dtype=float, + ) + dy = np.empty( + ( + esmf_nc.variables[self.config_options.lat_var].shape[0], + esmf_nc.variables[self.config_options.lon_var].shape[0], + ), + dtype=float, + ) + dx[:] = esmf_nc.variables[self.config_options.lon_var].dx + dy[:] = esmf_nc.variables[self.config_options.lat_var].dy + except Exception as e: + self.config_options.errMsg = f"Unable to extract dx and dy distances in {self.config_options.geogrid}" + raise e + try: + heights = esmf_nc.variables[self.config_options.hgt_var][:] + except Exception as e: + self.config_options.errMsg = f"Unable to extract heights of grid cells in {self.config_options.geogrid}" + raise e - # Extract global attributes - try: - global_att_names = idTmp.ncattrs() - except Exception as e: - config_options.errMsg = ( - "Unable to extract global attribute names from: " - + config_options.spatial_meta - ) - raise Exception - try: - self.spatial_global_atts = { - item: idTmp.getncattr(item) for item in global_att_names - } - except Exception as e: - config_options.errMsg = ( - "Unable to extract global attributes from: " - + config_options.spatial_meta - ) - raise Exception + esmf_nc.close() - # Extract x/y coordinate values - if len(idTmp.variables["x"].shape) == 1: - try: - self.x_coords = idTmp.variables["x"][:].data - except Exception as e: - config_options.errMsg = ( - "Unable to extract x coordinate values from: " - + config_options.spatial_meta - ) - raise Exception - try: - self.y_coords = idTmp.variables["y"][:].data - except Exception as e: - config_options.errMsg = ( - "Unable to extract y coordinate values from: " - + config_options.spatial_meta - ) - raise Exception - # Check to see if the Y coordinates are North-South. If so, flip them. - if self.y_coords[1] < self.y_coords[0]: - self.y_coords[:] = np.flip(self.y_coords[:], axis=0) - - if len(idTmp.variables["x"].shape) == 2: - try: - self.x_coords = idTmp.variables["x"][:, :].data - except Exception as e: - config_options.errMsg = ( - "Unable to extract x coordinate values from: " - + config_options.spatial_meta - ) - raise Exception - try: - self.y_coords = idTmp.variables["y"][:, :].data - except Exception as e: - config_options.errMsg = ( - "Unable to extract y coordinate values from: " - + config_options.spatial_meta - ) - raise Exception - # Check to see if the Y coordinates are North-South. If so, flip them. - if self.y_coords[1, 0] > self.y_coords[0, 0]: - self.y_coords[:, :] = np.flipud(self.y_coords[:, :]) - - # Close the geospatial metadata file. - try: - idTmp.close() - except Exception as e: - config_options.errMsg = ( - "Unable to close spatial metadata file: " - + config_options.spatial_meta - ) - raise Exception + # calculate grid coordinates dx distances in meters + # based on general geospatial formula approximations + # on a spherical grid + dz_init = np.diff(heights, axis=0) + dz = np.empty(dx.shape, dtype=float) + dz[0 : dz_init.shape[0], 0 : dz_init.shape[1]] = dz_init + dz[dz_init.shape[0] :, :] = dz_init[-1, :] - # mpi_config.comm.barrier() + slope = dz / np.sqrt((dx**2) + (dy**2)) + slp_azi = (180 / np.pi) * np.arctan(dx / dy) - def calc_slope(self, idTmp, config_options): - """Calculate slope grids needed for incoming shortwave radiation downscaling. + # Reset temporary arrays to None to free up memory + lons = None + lats = None + heights = None + dx = None + dy = None + dz = None - Function to calculate slope grids needed for incoming shortwave radiation downscaling - later during the program. - :param idTmp: - :param config_options: - :return: - """ - # First extract the sina,cosa, and elevation variables from the geogrid file. - try: - sinaGrid = idTmp.variables[config_options.sinalpha_var][0, :, :] - except Exception as e: - config_options.errMsg = ( - "Unable to extract SINALPHA from: " + config_options.geogrid - ) - raise + return slope, slp_azi - try: - cosaGrid = idTmp.variables[config_options.cosalpha_var][0, :, :] - except Exception as e: - config_options.errMsg = ( - "Unable to extract COSALPHA from: " + config_options.geogrid - ) - raise + @property + def x_lower_bound(self) -> float: + """Get the local x lower bound for this processor.""" + return self.esmf_grid.lower_bounds[ESMF.StaggerLoc.CENTER][1] - try: - heightDest = idTmp.variables[config_options.hgt_var][0, :, :] - except Exception as e: - config_options.errMsg = ( - "Unable to extract HGT_M from: " + config_options.geogrid - ) - raise + @property + def x_upper_bound(self) -> float: + """Get the local x upper bound for this processor.""" + return self.esmf_grid.upper_bounds[ESMF.StaggerLoc.CENTER][1] - # Ensure cosa/sina are correct dimensions - if sinaGrid.shape[0] != self.ny_global or sinaGrid.shape[1] != self.nx_global: - config_options.errMsg = ( - "SINALPHA dimensions mismatch in: " + config_options.geogrid - ) - raise Exception - if cosaGrid.shape[0] != self.ny_global or cosaGrid.shape[1] != self.nx_global: - config_options.errMsg = ( - "COSALPHA dimensions mismatch in: " + config_options.geogrid - ) - raise Exception - if ( - heightDest.shape[0] != self.ny_global - or heightDest.shape[1] != self.nx_global - ): - config_options.errMsg = ( - "HGT_M dimension mismatch in: " + config_options.geogrid - ) - raise Exception + @property + def y_lower_bound(self) -> float: + """Get the local y lower bound for this processor.""" + return self.esmf_grid.lower_bounds[ESMF.StaggerLoc.CENTER][0] - # Establish constants - rdx = 1.0 / self.dx_meters - rdy = 1.0 / self.dy_meters - msftx = 1.0 - msfty = 1.0 + @property + def y_upper_bound(self) -> float: + """Get the local y upper bound for this processor.""" + return self.esmf_grid.upper_bounds[ESMF.StaggerLoc.CENTER][0] - slopeOut = np.empty([self.ny_global, self.nx_global], np.float32) - toposlpx = np.empty([self.ny_global, self.nx_global], np.float32) - toposlpy = np.empty([self.ny_global, self.nx_global], np.float32) - slp_azi = np.empty([self.ny_global, self.nx_global], np.float32) - ipDiff = np.empty([self.ny_global, self.nx_global], np.int32) - jpDiff = np.empty([self.ny_global, self.nx_global], np.int32) - hx = np.empty([self.ny_global, self.nx_global], np.float32) - hy = np.empty([self.ny_global, self.nx_global], np.float32) + @property + def nx_local(self) -> int: + """Get the local x dimension size for this processor.""" + return self.x_upper_bound - self.x_lower_bound - # Create index arrays that will be used to calculate slope. - xTmp = np.arange(self.nx_global) - yTmp = np.arange(self.ny_global) - xGrid = np.tile(xTmp[:], (self.ny_global, 1)) - yGrid = np.repeat(yTmp[:, np.newaxis], self.nx_global, axis=1) - indOrig = np.where(heightDest == heightDest) - indIp1 = ((indOrig[0]), (indOrig[1] + 1)) - indIm1 = ((indOrig[0]), (indOrig[1] - 1)) - indJp1 = ((indOrig[0] + 1), (indOrig[1])) - indJm1 = ((indOrig[0] - 1), (indOrig[1])) - indIp1[1][np.where(indIp1[1] >= self.nx_global)] = self.nx_global - 1 - indJp1[0][np.where(indJp1[0] >= self.ny_global)] = self.ny_global - 1 - indIm1[1][np.where(indIm1[1] < 0)] = 0 - indJm1[0][np.where(indJm1[0] < 0)] = 0 - - ipDiff[indOrig] = xGrid[indIp1] - xGrid[indIm1] - jpDiff[indOrig] = yGrid[indJp1] - yGrid[indJm1] - - toposlpx[indOrig] = ( - (heightDest[indIp1] - heightDest[indIm1]) * msftx * rdx - ) / ipDiff[indOrig] - toposlpy[indOrig] = ( - (heightDest[indJp1] - heightDest[indJm1]) * msfty * rdy - ) / jpDiff[indOrig] - hx[indOrig] = toposlpx[indOrig] - hy[indOrig] = toposlpy[indOrig] - slopeOut[indOrig] = np.arctan((hx[indOrig] ** 2 + hy[indOrig] ** 2) ** 0.5) - slopeOut[np.where(slopeOut < 1e-4)] = 0.0 - slp_azi[np.where(slopeOut < 1e-4)] = 0.0 - indValidTmp = np.where(slopeOut >= 1e-4) - slp_azi[indValidTmp] = np.arctan2(hx[indValidTmp], hy[indValidTmp]) + math.pi - indValidTmp = np.where(cosaGrid >= 0.0) - slp_azi[indValidTmp] = slp_azi[indValidTmp] - np.arcsin(sinaGrid[indValidTmp]) - indValidTmp = np.where(cosaGrid < 0.0) - slp_azi[indValidTmp] = slp_azi[indValidTmp] - ( - math.pi - np.arcsin(sinaGrid[indValidTmp]) - ) + @property + def ny_local(self) -> int: + """Get the local y dimension size for this processor.""" + return self.y_upper_bound - self.y_lower_bound - # Reset temporary arrays to None to free up memory - toposlpx = None - toposlpy = None - heightDest = None - sinaGrid = None - cosaGrid = None - indValidTmp = None - xTmp = None - yTmp = None - xGrid = None - ipDiff = None - jpDiff = None - indOrig = None - indJm1 = None - indJp1 = None - indIm1 = None - indIp1 = None - hx = None - hy = None - return slopeOut, slp_azi +class HydrofabricGeoMeta(GeoMeta): + """Class for handling information about the unstructured hydrofabric domain we are processing forcings to.""" - def calc_slope_gridded(self, idTmp, config_options): - """Calculate slope grids needed for incoming shortwave radiation downscaling. + def __init__(self, config_options: ConfigOptions, mpi_config: MpiConfig): + """Initialize GeoMetaWrfHydro class variables. - Function to calculate slope grids needed for incoming shortwave radiation downscaling - later during the program. This calculates the slopes for grid cells - :param idTmp: - :param config_options: + Initialization function to initialize ESMF through ESMPy, + calculate the global parameters of the WRF-Hydro grid + being processed to, along with the local parameters + for this particular processor. :return: """ - idTmp = netCDF4.Dataset(config_options.geogrid, "r") + super().__init__(config_options, mpi_config) + self.nx_local_elem = None + self.ny_local_elem = None + self.x_lower_bound = None + self.x_upper_bound = None + self.y_lower_bound = None + self.y_upper_bound = None + if self.config_options.geogrid is not None: + # Phase 1: Rank 0 extracts all needed global data + if self.mpi_config.rank == 0: + try: + esmf_nc = nc_utils.nc_Dataset_retry( + self.mpi_config, + self.config_options, + err_handler, + self.config_options.geogrid, + "r", + ) - try: - lons = idTmp.variables[config_options.lon_var][:] - lats = idTmp.variables[config_options.lat_var][:] - except Exception as e: - config_options.errMsg = ( - "Unable to extract gridded coordinates in " + config_options.geogrid - ) - raise Exception - try: - dx = np.empty( - ( - idTmp.variables[config_options.lat_var].shape[0], - idTmp.variables[config_options.lon_var].shape[0], - ), - dtype=float, + # Extract everything we need with retries + tmp_vars = esmf_nc.variables + + if self.config_options.aws: + nodecoords_data = nc_utils.nc_read_var_retry( + self.mpi_config, + self.config_options, + err_handler, + tmp_vars[self.config_options.nodecoords_var], + ) + self.lat_bounds = nodecoords_data[:, 1] + self.lon_bounds = nodecoords_data[:, 0] + + # Store these for later broadcast/scatter + elementcoords_global = nc_utils.nc_read_var_retry( + self.mpi_config, + self.config_options, + err_handler, + tmp_vars[self.config_options.elemcoords_var], + ) + + self.nx_global = elementcoords_global.shape[0] + self.ny_global = self.nx_global + + element_ids_global = nc_utils.nc_read_var_retry( + self.mpi_config, + self.config_options, + err_handler, + tmp_vars[self.config_options.element_id_var], + ) + + heights_global = None + if self.config_options.hgt_var is not None: + heights_global = nc_utils.nc_read_var_retry( + self.mpi_config, + self.config_options, + err_handler, + tmp_vars[self.config_options.hgt_var], + ) + slopes_global = None + slp_azi_global = None + if self.config_options.slope_var is not None: + slopes_global = nc_utils.nc_read_var_retry( + self.mpi_config, + self.config_options, + err_handler, + tmp_vars[self.config_options.slope_var], + ) + if self.config_options.slope_azimuth_var is not None: + slp_azi_global = nc_utils.nc_read_var_retry( + self.mpi_config, + self.config_options, + err_handler, + tmp_vars[self.config_options.slope_azimuth_var], + ) + + except Exception as e: + LOG.critical( + f"Failed to open mesh file: {self.config_options.geogrid} " + f"due to {str(e)}" + ) + raise + finally: + esmf_nc.close() + else: + elementcoords_global = None + element_ids_global = None + heights_global = None + slopes_global = None + slp_azi_global = None + + # Broadcast dimensions + self.nx_global = self.mpi_config.broadcast_parameter( + self.nx_global, self.config_options, param_type=int ) - dy = np.empty( - ( - idTmp.variables[config_options.lat_var].shape[0], - idTmp.variables[config_options.lon_var].shape[0], - ), - dtype=float, + self.ny_global = self.mpi_config.broadcast_parameter( + self.ny_global, self.config_options, param_type=int ) - dx[:] = idTmp.variables[config_options.lon_var].dx - dy[:] = idTmp.variables[config_options.lat_var].dy - except Exception as e: - config_options.errMsg = ( - "Unable to extract dx and dy distances in " + config_options.geogrid + + self.mpi_config.comm.barrier() + + # Phase 2: Create ESMF Mesh (collective operation with retry) + try: + self.esmf_grid = esmf_utils.esmf_mesh_retry( + self.mpi_config, + self.config_options, + err_handler, + filename=self.config_options.geogrid, + filetype=ESMF.FileFormat.ESMFMESH, + ) + except Exception as e: + LOG.critical( + f"Unable to create ESMF Mesh: {self.config_options.geogrid} " + f"due to {str(e)}" + ) + raise e + + # Extract local coordinates from ESMF mesh + self.latitude_grid = self.esmf_grid.coords[1][1] + self.longitude_grid = self.esmf_grid.coords[1][0] + + # Phase 3: Broadcast global arrays and compute local indices + elementcoords_global = self.mpi_config.comm.bcast( + elementcoords_global, root=0 ) - raise Exception - try: - heights = idTmp.variables[config_options.hgt_var][:] - except Exception as e: - config_options.errMsg = ( - "Unable to extract heights of grid cells in " + config_options.geogrid + element_ids_global = self.mpi_config.comm.bcast(element_ids_global, root=0) + + # Each rank computes its own local indices + pet_elementcoords = np.column_stack( + [self.longitude_grid, self.latitude_grid] ) - raise Exception + tree = spatial.KDTree(elementcoords_global) + _, pet_element_inds = tree.query(pet_elementcoords) - idTmp.close() + self.element_ids = element_ids_global[pet_element_inds] + self.element_ids_global = element_ids_global - # calculate grid coordinates dx distances in meters - # based on general geospatial formula approximations - # on a spherical grid - dz_init = np.diff(heights, axis=0) - dz = np.empty(dx.shape, dtype=float) - dz[0 : dz_init.shape[0], 0 : dz_init.shape[1]] = dz_init - dz[dz_init.shape[0] :, :] = dz_init[-1, :] + # Broadcast and extract height/slope data + if self.config_options.hgt_var is not None: + heights_global = self.mpi_config.comm.bcast(heights_global, root=0) + self.height = heights_global[pet_element_inds] - slope = dz / np.sqrt((dx**2) + (dy**2)) - slp_azi = (180 / np.pi) * np.arctan(dx / dy) + if self.config_options.slope_var is not None: + slopes_global = self.mpi_config.comm.bcast(slopes_global, root=0) + slp_azi_global = self.mpi_config.comm.bcast(slp_azi_global, root=0) + self.slope = slopes_global[pet_element_inds] + self.slp_azi = slp_azi_global[pet_element_inds] - # Reset temporary arrays to None to free up memory - lons = None - lats = None - heights = None - dx = None - dy = None - dz = None + self.mesh_inds = pet_element_inds - return slope, slp_azi + @property + def nx_local(self) -> int: + """Get the local x dimension size for this processor.""" + return len(self.esmf_grid.coords[1][1]) + + @property + def ny_local(self) -> int: + """Get the local y dimension size for this processor.""" + return len(self.esmf_grid.coords[1][1]) - def initialize_destination_geo_unstructured(self, config_options, mpi_config): + +class UnstructuredGeoMeta(GeoMeta): + """Class for handling information about the unstructured domain we are processing forcings to.""" + + def __init__(self, config_options: ConfigOptions, mpi_config: MpiConfig): """Initialize GeoMetaWrfHydro class variables. Initialization function to initialize ESMF through ESMPy, @@ -887,77 +996,76 @@ def initialize_destination_geo_unstructured(self, config_options, mpi_config): for this particular processor. :return: """ + super().__init__(config_options, mpi_config) + + self.x_lower_bound = None + self.x_upper_bound = None + self.y_lower_bound = None + self.y_upper_bound = None # Open the geogrid file and extract necessary information # to create ESMF fields. if mpi_config.rank == 0: try: - idTmp = netCDF4.Dataset(config_options.geogrid, "r") + esmf_nc = netCDF4.Dataset(self.config_options.geogrid, "r") except Exception as e: - config_options.errMsg = ( - "Unable to open the unstructured mesh file: " - + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to open the unstructured mesh file: {self.config_options.geogrid}" + raise e try: - self.nx_global = idTmp.variables[config_options.nodecoords_var].shape[0] + self.nx_global = esmf_nc.variables[ + self.config_options.nodecoords_var + ].shape[0] except Exception as e: - config_options.errMsg = ( - "Unable to extract X dimension size in " + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to extract X dimension size in {self.config_options.geogrid}" + raise e try: - self.ny_global = idTmp.variables[config_options.nodecoords_var].shape[0] + self.ny_global = esmf_nc.variables[ + self.config_options.nodecoords_var + ].shape[0] except Exception as e: - config_options.errMsg = ( - "Unable to extract Y dimension size in " + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to extract Y dimension size in {self.config_options.geogrid}" + raise e try: - self.nx_global_elem = idTmp.variables[ - config_options.elemcoords_var + self.nx_global_elem = esmf_nc.variables[ + self.config_options.elemcoords_var ].shape[0] except Exception as e: - config_options.errMsg = ( - "Unable to extract X dimension size in " + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to extract X dimension size in {self.config_options.geogrid}" + raise e try: - self.ny_global_elem = idTmp.variables[ - config_options.elemcoords_var + self.ny_global_elem = esmf_nc.variables[ + self.config_options.elemcoords_var ].shape[0] except Exception as e: - config_options.errMsg = ( - "Unable to extract Y dimension size in " + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to extract Y dimension size in {self.config_options.geogrid}" + raise e # Flag to grab entire array for AWS slicing - if config_options.aws: - self.lat_bounds = idTmp.variables[config_options.nodecoords_var][:][ - :, 1 - ] - self.lon_bounds = idTmp.variables[config_options.nodecoords_var][:][ - :, 0 - ] + if self.config_options.aws: + self.lat_bounds = esmf_nc.variables[self.config_options.nodecoords_var][ + : + ][:, 1] + self.lon_bounds = esmf_nc.variables[self.config_options.nodecoords_var][ + : + ][:, 0] # mpi_config.comm.barrier() # Broadcast global dimensions to the other processors. self.nx_global = mpi_config.broadcast_parameter( - self.nx_global, config_options, param_type=int + self.nx_global, self.config_options, param_type=int ) self.ny_global = mpi_config.broadcast_parameter( - self.ny_global, config_options, param_type=int + self.ny_global, self.config_options, param_type=int ) self.nx_global_elem = mpi_config.broadcast_parameter( - self.nx_global_elem, config_options, param_type=int + self.nx_global_elem, self.config_options, param_type=int ) self.ny_global_elem = mpi_config.broadcast_parameter( - self.ny_global_elem, config_options, param_type=int + self.ny_global_elem, self.config_options, param_type=int ) # mpi_config.comm.barrier() @@ -965,60 +1073,58 @@ def initialize_destination_geo_unstructured(self, config_options, mpi_config): if mpi_config.rank == 0: # Close the geogrid file try: - idTmp.close() + esmf_nc.close() except Exception as e: - config_options.errMsg = ( - "Unable to close geogrid Mesh file: " + config_options.geogrid + self.config_options.errMsg = ( + f"Unable to close geogrid Mesh file: {self.config_options.geogrid}" ) - raise Exception + raise e try: # Removed argument coord_sys=ESMF.CoordSys.SPH_DEG since we are always reading from a file # From ESMF documentation # If you create a mesh from a file (like NetCDF/ESMF-Mesh), coord_sys is ignored. The mesh’s coordinate system should be embedded in the file or inferred. self.esmf_grid = ESMF.Mesh( - filename=config_options.geogrid, filetype=ESMF.FileFormat.ESMFMESH + filename=self.config_options.geogrid, filetype=ESMF.FileFormat.ESMFMESH ) except Exception as e: - config_options.errMsg = ( - "Unable to create ESMF Mesh from geogrid file: " - + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to create ESMF Mesh from geogrid file: {self.config_options.geogrid}" + raise e # mpi_config.comm.barrier() - # Obtain the local boundaries for this processor. - self.get_processor_bounds(config_options) - # Place the local lat/lon grid slices from the parent geogrid file into # the ESMF lat/lon grids that have already been seperated by processors. try: self.latitude_grid = self.esmf_grid.coords[0][1] self.latitude_grid_elem = self.esmf_grid.coords[1][1] - varSubTmp = None - varTmp = None + var_sub_tmp = None + var_tmp = None except Exception as e: - config_options.errMsg = ( + self.config_options.errMsg = ( "Unable to subset node latitudes from ESMF Mesh object" ) - raise Exception + raise e try: self.longitude_grid = self.esmf_grid.coords[0][0] self.longitude_grid_elem = self.esmf_grid.coords[1][0] - varSubTmp = None - varTmp = None + var_sub_tmp = None + var_tmp = None except Exception as e: - config_options.errMsg = ( + self.config_options.errMsg = ( "Unable to subset XLONG_M from geogrid file into ESMF Mesh object" ) - raise Exception + raise e - idTmp = netCDF4.Dataset(config_options.geogrid, "r") + esmf_nc = netCDF4.Dataset(self.config_options.geogrid, "r") # Get lat and lon global variables for pet extraction of indices - nodecoords_global = idTmp.variables[config_options.nodecoords_var][:].data - elementcoords_global = idTmp.variables[config_options.elemcoords_var][:].data + nodecoords_global = esmf_nc.variables[self.config_options.nodecoords_var][ + : + ].data + elementcoords_global = esmf_nc.variables[self.config_options.elemcoords_var][ + : + ].data # Find the corresponding local indices to slice global heights and slope # variables that are based on the partitioning on the unstructured mesh @@ -1047,58 +1153,62 @@ def initialize_destination_geo_unstructured(self, config_options, mpi_config): # accepting the pre-calculated slope and slope azmiuth variables if available, # otherwise calculate slope from height estimates # if(config_options.cosalpha_var != None and config_options.sinalpha_var != None): - # self.cosa_grid = idTmp.variables[config_options.cosalpha_var][:].data[pet_node_inds] - # self.sina_grid = idTmp.variables[config_options.sinalpha_var][:].data[pet_node_inds] - # slopeTmp, slp_azi_tmp = self.calc_slope(idTmp,config_options) - # self.slope = slope_node_Tmp[pet_node_inds] + # self.cosa_grid = esmf_nc.variables[config_options.cosalpha_var][:].data[pet_node_inds] + # self.sina_grid = esmf_nc.variables[config_options.sinalpha_var][:].data[pet_node_inds] + # slope_tmp, slp_azi_tmp = self.calc_slope(esmf_nc,config_options) + # self.slope = slope_node_tmp[pet_node_inds] # self.slp_azi = slp_azi_node_tmp[pet_node_inds] if ( - config_options.slope_var is not None - and config_options.slp_azi_var is not None + self.config_options.slope_var is not None + and self.config_options.slp_azi_var is not None ): - self.slope = idTmp.variables[config_options.slope_var][:].data[ + self.slope = esmf_nc.variables[self.config_options.slope_var][:].data[ pet_node_inds ] - self.slp_azi = idTmp.variables[config_options.slope_azimuth_var][:].data[ - pet_node_inds - ] - self.slope_elem = idTmp.variables[config_options.slope_var_elem][:].data[ - pet_element_inds - ] - self.slp_azi_elem = idTmp.variables[config_options.slope_azimuth_var_elem][ + self.slp_azi = esmf_nc.variables[self.config_options.slope_azimuth_var][ + : + ].data[pet_node_inds] + self.slope_elem = esmf_nc.variables[self.config_options.slope_var_elem][ : ].data[pet_element_inds] + self.slp_azi_elem = esmf_nc.variables[ + self.config_options.slope_azimuth_var_elem + ][:].data[pet_element_inds] # Read in a scatter the mesh node elevation, which is used for downscaling purposes - self.height = idTmp.variables[config_options.hgt_var][:].data[pet_node_inds] - # Read in a scatter the mesh element elevation, which is used for downscaling purposes. - self.height_elem = idTmp.variables[config_options.hgt_elem_var][:].data[ - pet_element_inds + self.height = esmf_nc.variables[self.config_options.hgt_var][:].data[ + pet_node_inds ] + # Read in a scatter the mesh element elevation, which is used for downscaling purposes. + self.height_elem = esmf_nc.variables[self.config_options.hgt_elem_var][ + : + ].data[pet_element_inds] - elif config_options.hgt_var is not None: + elif self.config_options.hgt_var is not None: # Read in a scatter the mesh node elevation, which is used for downscaling purposes - self.height = idTmp.variables[config_options.hgt_var][:].data[pet_node_inds] + self.height = esmf_nc.variables[self.config_options.hgt_var][:].data[ + pet_node_inds + ] # Read in a scatter the mesh element elevation, which is used for downscaling purposes. - self.height_elem = idTmp.variables[config_options.hgt_elem_var][:].data[ - pet_element_inds - ] + self.height_elem = esmf_nc.variables[self.config_options.hgt_elem_var][ + : + ].data[pet_element_inds] # Calculate the slope from the domain using elevation on the WRF-Hydro domain. This will # be used for downscaling purposes. - slope_node_Tmp, slp_azi_node_tmp, slope_elem_Tmp, slp_azi_elem_tmp = ( - self.calc_slope_unstructured(idTmp, config_options) + slope_node_tmp, slp_azi_node_tmp, slope_elem_tmp, slp_azi_elem_tmp = ( + self.calc_slope_unstructured(esmf_nc) ) - self.slope = slope_node_Tmp[pet_node_inds] - slope_node_Tmp = None + self.slope = slope_node_tmp[pet_node_inds] + slope_node_tmp = None self.slp_azi = slp_azi_node_tmp[pet_node_inds] - slp_azi__node_tmp = None + slp_azi_node_tmp = None - self.slope_elem = slope_elem_Tmp[pet_element_inds] - slope_elem_Tmp = None + self.slope_elem = slope_elem_tmp[pet_element_inds] + slope_elem_tmp = None self.slp_azi_elem = slp_azi_elem_tmp[pet_element_inds] slp_azi_elem_tmp = None @@ -1111,69 +1221,84 @@ def initialize_destination_geo_unstructured(self, config_options, mpi_config): pet_node_inds = None pet_element_inds = None - def calc_slope_unstructured(self, idTmp, config_options): + @property + def nx_local(self) -> int: + """Get the local x dimension size for this processor.""" + return len(self.esmf_grid.coords[0][1]) + + @property + def ny_local(self) -> int: + """Get the local y dimension size for this processor.""" + return len(self.esmf_grid.coords[0][1]) + + @property + def nx_local_elem(self) -> int: + """Get the local x dimension size for this processor.""" + return len(self.esmf_grid.coords[1][1]) + + @property + def ny_local_elem(self) -> int: + """Get the local y dimension size for this processor.""" + return len(self.esmf_grid.coords[1][1]) + + def calc_slope_unstructured(self, esmf_nc: netCDF4.Dataset) -> tuple: """Calculate slope grids needed for incoming shortwave radiation downscaling. Function to calculate slope grids needed for incoming shortwave radiation downscaling later during the program. This calculates the slopes for both nodes and elements - :param idTmp: - :param config_options: - :return: + :param esmf_nc: The open netCDF4 dataset for the geogrid file, passed in to avoid having to reopen the file multiple times + :return: A tuple containing slope and slope azimuth for nodes and elements """ - idTmp = netCDF4.Dataset(config_options.geogrid, "r") + esmf_nc = netCDF4.Dataset(self.config_options.geogrid, "r") try: - node_lons = idTmp.variables[config_options.nodecoords_var][:][:, 0] - node_lats = idTmp.variables[config_options.nodecoords_var][:][:, 1] + node_lons = esmf_nc.variables[self.config_options.nodecoords_var][:][:, 0] + node_lats = esmf_nc.variables[self.config_options.nodecoords_var][:][:, 1] except Exception as e: - config_options.errMsg = ( - "Unable to extract node coordinates in " + config_options.geogrid + self.config_options.errMsg = ( + f"Unable to extract node coordinates in {self.config_options.geogrid}" ) - raise Exception + raise e try: - elem_lons = idTmp.variables[config_options.elemcoords_var][:][:, 0] - elem_lats = idTmp.variables[config_options.elemcoords_var][:][:, 1] + elem_lons = esmf_nc.variables[self.config_options.elemcoords_var][:][:, 0] + elem_lats = esmf_nc.variables[self.config_options.elemcoords_var][:][:, 1] except Exception as e: - config_options.errMsg = ( - "Unable to extract element coordinates in " + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to extract element coordinates in {self.config_options.geogrid}" + raise e try: - elem_conn = idTmp.variables[config_options.elemconn_var][:][:, 0] + elem_conn = esmf_nc.variables[self.config_options.elemconn_var][:][:, 0] except Exception as e: - config_options.errMsg = ( - "Unable to extract element connectivity in " + config_options.geogrid - ) - raise Exception + self.config_options.errMsg = f"Unable to extract element connectivity in {self.config_options.geogrid}" + raise e try: - node_heights = idTmp.variables[config_options.hgt_var][:] + node_heights = esmf_nc.variables[self.config_options.hgt_var][:] except Exception as e: - config_options.errMsg = ( - "Unable to extract HGT_M from: " + config_options.geogrid + self.config_options.errMsg = ( + f"Unable to extract HGT_M from: {self.config_options.geogrid}" ) - raise + raise e if node_heights.shape[0] != self.ny_global: - config_options.errMsg = ( - "HGT_M dimension mismatch in: " + config_options.geogrid + self.config_options.errMsg = ( + f"HGT_M dimension mismatch in: {self.config_options.geogrid}" ) raise Exception try: - elem_heights = idTmp.variables[config_options.hgt_elem_var][:] + elem_heights = esmf_nc.variables[self.config_options.hgt_elem_var][:] except Exception as e: - config_options.errMsg = ( - "Unable to extract HGT_M_ELEM from: " + config_options.geogrid + self.config_options.errMsg = ( + f"Unable to extract HGT_M_ELEM from: {self.config_options.geogrid}" ) - raise + raise e if elem_heights.shape[0] != len(elem_lons): - config_options.errMsg = ( - "HGT_M_ELEM dimension mismatch in: " + config_options.geogrid + self.config_options.errMsg = ( + f"HGT_M_ELEM dimension mismatch in: {self.config_options.geogrid}" ) raise Exception - idTmp.close() + esmf_nc.close() # calculate node coordinate distances in meters # based on general geospatial formula approximations @@ -1213,156 +1338,3 @@ def calc_slope_unstructured(self, idTmp, config_options): dz = None return slope_nodes, slp_azi_nodes, slope_elem, slp_azi_elem - - def initialize_destination_geo_hydrofabric(self, config_options, mpi_config): - """Initialize GeoMetaWrfHydro class variables. - - Initialization function to initialize ESMF through ESMPy, - calculate the global parameters of the WRF-Hydro grid - being processed to, along with the local parameters - for this particular processor. - :return: - """ - - if config_options.geogrid is not None: - # Phase 1: Rank 0 extracts all needed global data - if mpi_config.rank == 0: - try: - idTmp = nc_utils.nc_Dataset_retry( - mpi_config, - config_options, - err_handler, - config_options.geogrid, - "r", - ) - - # Extract everything we need with retries - tmp_vars = idTmp.variables - - if config_options.aws: - nodecoords_data = nc_utils.nc_read_var_retry( - mpi_config, - config_options, - err_handler, - tmp_vars[config_options.nodecoords_var], - ) - self.lat_bounds = nodecoords_data[:, 1] - self.lon_bounds = nodecoords_data[:, 0] - - # Store these for later broadcast/scatter - elementcoords_global = nc_utils.nc_read_var_retry( - mpi_config, - config_options, - err_handler, - tmp_vars[config_options.elemcoords_var], - ) - - self.nx_global = elementcoords_global.shape[0] - self.ny_global = self.nx_global - - element_ids_global = nc_utils.nc_read_var_retry( - mpi_config, - config_options, - err_handler, - tmp_vars[config_options.element_id_var], - ) - - heights_global = None - if config_options.hgt_var is not None: - heights_global = nc_utils.nc_read_var_retry( - mpi_config, - config_options, - err_handler, - tmp_vars[config_options.hgt_var], - ) - slopes_global = None - slp_azi_global = None - if config_options.slope_var is not None: - slopes_global = nc_utils.nc_read_var_retry( - mpi_config, - config_options, - err_handler, - tmp_vars[config_options.slope_var], - ) - if config_options.slope_azimuth_var is not None: - slp_azi_global = nc_utils.nc_read_var_retry( - mpi_config, - config_options, - err_handler, - tmp_vars[config_options.slope_azimuth_var], - ) - - except Exception as e: - LOG.critical( - f"Failed to open mesh file: {config_options.geogrid} " - f"due to {str(e)}" - ) - raise - finally: - idTmp.close() - else: - elementcoords_global = None - element_ids_global = None - heights_global = None - slopes_global = None - slp_azi_global = None - - # Broadcast dimensions - self.nx_global = mpi_config.broadcast_parameter( - self.nx_global, config_options, param_type=int - ) - self.ny_global = mpi_config.broadcast_parameter( - self.ny_global, config_options, param_type=int - ) - - mpi_config.comm.barrier() - - # Phase 2: Create ESMF Mesh (collective operation with retry) - try: - self.esmf_grid = esmf_utils.esmf_mesh_retry( - mpi_config, - config_options, - err_handler, - filename=config_options.geogrid, - filetype=ESMF.FileFormat.ESMFMESH, - ) - except Exception as e: - LOG.critical( - f"Unable to create ESMF Mesh: {config_options.geogrid} " - f"due to {str(e)}" - ) - raise - - # Get processor bounds - self.get_processor_bounds(config_options) - - # Extract local coordinates from ESMF mesh - self.latitude_grid = self.esmf_grid.coords[1][1] - self.longitude_grid = self.esmf_grid.coords[1][0] - - # Phase 3: Broadcast global arrays and compute local indices - elementcoords_global = mpi_config.comm.bcast(elementcoords_global, root=0) - element_ids_global = mpi_config.comm.bcast(element_ids_global, root=0) - - # Each rank computes its own local indices - pet_elementcoords = np.column_stack( - [self.longitude_grid, self.latitude_grid] - ) - tree = spatial.KDTree(elementcoords_global) - _, pet_element_inds = tree.query(pet_elementcoords) - - self.element_ids = element_ids_global[pet_element_inds] - self.element_ids_global = element_ids_global - - # Broadcast and extract height/slope data - if config_options.hgt_var is not None: - heights_global = mpi_config.comm.bcast(heights_global, root=0) - self.height = heights_global[pet_element_inds] - - if config_options.slope_var is not None: - slopes_global = mpi_config.comm.bcast(slopes_global, root=0) - slp_azi_global = mpi_config.comm.bcast(slp_azi_global, root=0) - self.slope = slopes_global[pet_element_inds] - self.slp_azi = slp_azi_global[pet_element_inds] - - self.mesh_inds = pet_element_inds diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/regrid.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/regrid.py index 99126c62..df0b9e89 100755 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/regrid.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/core/regrid.py @@ -43,7 +43,7 @@ ConfigOptions, ) from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.geoMod import ( - GeoMetaWrfHydro, + GeoMeta, ) from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.parallel import MpiConfig from nextgen_forcings_ewts import MODULE_NAME @@ -11979,7 +11979,7 @@ def check_supp_pcp_regrid_status( def get_weight_file_names( mpi_config: MpiConfig, config_options: ConfigOptions, - input_forcings: GeoMetaWrfHydro, + input_forcings: GeoMeta, ) -> tuple[str | None, str | None]: """Get weight file names for regridding.""" if not config_options.weightsDir: @@ -12005,7 +12005,7 @@ def get_weight_file_names( def load_weight_file( mpi_config: MpiConfig, config_options: ConfigOptions, - input_forcings: GeoMetaWrfHydro, + input_forcings: GeoMeta, weight_file: str, element_mode: bool, ) -> None: @@ -12051,7 +12051,7 @@ def load_weight_file( def make_regrid( mpi_config: MpiConfig, config_options: ConfigOptions, - input_forcings: GeoMetaWrfHydro, + input_forcings: GeoMeta, weight_file: str | None, fill: bool, element_mode: bool, @@ -12120,7 +12120,7 @@ def make_regrid( def execute_regrid( mpi_config: MpiConfig, config_options: ConfigOptions, - input_forcings: GeoMetaWrfHydro, + input_forcings: GeoMeta, weight_file: str, element_mode: bool, ) -> None: diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/model.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/model.py index a71a2f8c..d6bd80dc 100755 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/model.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/model.py @@ -18,7 +18,7 @@ ConfigOptions, ) from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.geoMod import ( - GeoMetaWrfHydro, + GeoMeta, ) from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.ioMod import OutputObj from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.parallel import MpiConfig @@ -80,7 +80,7 @@ def run( model: dict, future_time: float, config_options: ConfigOptions, - wrf_hydro_geo_meta: GeoMetaWrfHydro, + wrf_hydro_geo_meta: GeoMeta, input_forcing_mod: dict, supp_pcp_mod: dict, mpi_config: MpiConfig, @@ -347,7 +347,7 @@ def loop_through_forcing_products( self, future_time: float, config_options: ConfigOptions, - wrf_hydro_geo_meta: GeoMetaWrfHydro, + wrf_hydro_geo_meta: GeoMeta, input_forcing_mod: dict, supp_pcp_mod: dict, mpi_config: MpiConfig, @@ -664,7 +664,7 @@ def loop_through_forcing_products( def process_suplemental_precip( self, config_options: ConfigOptions, - wrf_hydro_geo_meta: GeoMetaWrfHydro, + wrf_hydro_geo_meta: GeoMeta, supp_pcp_mod: dict, mpi_config: MpiConfig, output_obj: OutputObj, @@ -736,7 +736,7 @@ def process_suplemental_precip( def write_output( self, config_options: ConfigOptions, - wrf_hydro_geo_meta: GeoMetaWrfHydro, + wrf_hydro_geo_meta: GeoMeta, mpi_config: MpiConfig, output_obj: OutputObj, ): @@ -764,7 +764,7 @@ def update_dict( self, model: dict, config_options: ConfigOptions, - wrf_hydro_geo_meta: GeoMetaWrfHydro, + wrf_hydro_geo_meta: GeoMeta, output_obj: OutputObj, ): """Flatten the Forcings Engine output object and update the BMI dictionary.""" diff --git a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/retry_utils.py b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/retry_utils.py index 5b2598a2..dc14dfb5 100644 --- a/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/retry_utils.py +++ b/NextGen_Forcings_Engine_BMI/NextGen_Forcings_Engine/retry_utils.py @@ -3,8 +3,8 @@ import traceback import types -from .core.config import ConfigOptions -from .core.parallel import MpiConfig +from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.parallel import MpiConfig +from NextGen_Forcings_Engine_BMI.NextGen_Forcings_Engine.core.config import ConfigOptions def retry_w_mpi_context(