forked from NOAA-OWP/icefabric
-
Notifications
You must be signed in to change notification settings - Fork 1
NWM Request: Query Modules based on catchments #8
Copy link
Copy link
Open
Description
Summary
The NWM team requested the ability to query the Icefabric API endpoints by catchment ID rather than just gage ID.
Sample implementation
instead of using just a gauge ID in the get_XXX_parameters function from the config mapper, we can pass in the catchment IDs.
Below is the SFT parameter mapper where the identifier is the gauge ID. IF the identifier isn't the gauge ID we just have to query the catalog for the divide attributes information for those specific IDs
def get_sft_parameters(
catalog: Catalog,
namespace: str,
identifier: str,
graph: rx.PyDiGraph,
use_schaake: bool = False,
) -> list[SFT]:
"""Creates the initial parameter estimates for the SFT module
Parameters
----------
catalog : Catalog
the pyiceberg lakehouse catalog
namespace : str
the hydrofabric namespace
identifier : str
the gauge identifier
use_schaake : bool, optional
A setting to determine if Shaake should be used for ice fraction, by default False
Returns
-------
list[SFT]
The list of all initial parameters for catchments using SFT
"""
gauge: dict[str, pd.DataFrame | gpd.GeoDataFrame] = subset_hydrofabric(
catalog=catalog,
identifier=identifier,
id_type=IdType.HL_URI,
namespace=namespace,
layers=["flowpaths", "nexus", "divides", "divide-attributes", "network"],
graph=graph,
)
***** MAKE CHANGE HERE!!!!*****
attr = {"smcmax": "mean.smcmax", "bexp": "mode.bexp", "psisat": "geom_mean.psisat"}
df = pl.DataFrame(gauge["divide-attributes"])
expressions = [pl.col("divide_id")] # Keep the divide_id
for param_name, prefix in attr.items():
# Find all columns that start with the prefix
matching_cols = [col for col in df.columns if col.startswith(prefix)]
if matching_cols:
# Calculate mean across matching columns for each row.
# NOTE: this assumes an even weighting. TODO: determine if we need to have weighted averaging
expressions.append(
pl.concat_list([pl.col(col) for col in matching_cols]).list.mean().alias(f"{param_name}_avg")
)
else:
# Default to 0.0 if no matching columns found
expressions.append(pl.lit(0.0).alias(f"{param_name}_avg"))
result_df = df.select(expressions)
mean_temp = _get_mean_soil_temp()
pydantic_models = []
for row_dict in result_df.iter_rows(named=True):
# Instantiate the Pydantic model for each row
model_instance = SFT(
catchment=row_dict["divide_id"],
smcmax=row_dict["smcmax_avg"],
b=row_dict["bexp_avg"],
satpsi=row_dict["psisat_avg"],
ice_fraction_scheme=IceFractionScheme.XINANJIANG
if use_schaake is False
else IceFractionScheme.SCHAAKE,
soil_temperature=[
mean_temp for _ in range(4)
], # Assuming 45 degrees in all layers. TODO: Fix this as this doesn't make sense
)
pydantic_models.append(model_instance)
return pydantic_modelsReactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels