From c2b6cba65173e6795cfdd50092bea87ccf6fc9b7 Mon Sep 17 00:00:00 2001 From: FlxPo Date: Tue, 17 Mar 2026 08:57:56 +0100 Subject: [PATCH] add time varying parameters --- mobility/__init__.py | 1 + .../destination_sequence_sampler.py | 19 ++- mobility/choice_models/population_trips.py | 30 ++-- mobility/choice_models/state_initializer.py | 24 +++- mobility/choice_models/state_updater.py | 57 +++++--- mobility/motives/home.py | 10 +- mobility/motives/leisure.py | 6 +- mobility/motives/motive.py | 45 +++--- mobility/motives/other.py | 10 +- mobility/motives/shopping.py | 6 +- mobility/motives/studies.py | 6 +- mobility/motives/work.py | 12 +- mobility/simulation_profile.py | 130 ++++++++++++++++++ mobility/validation_types.py | 6 + 14 files changed, 286 insertions(+), 76 deletions(-) create mode 100644 mobility/simulation_profile.py create mode 100644 mobility/validation_types.py diff --git a/mobility/__init__.py b/mobility/__init__.py index 7ebe037..1efb0ae 100644 --- a/mobility/__init__.py +++ b/mobility/__init__.py @@ -46,6 +46,7 @@ from mobility.choice_models.population_trips import PopulationTrips from mobility.choice_models.population_trips_parameters import PopulationTripsParameters +from .simulation_profile import ParameterProfile, SimulationStep from mobility.transport_graphs.speed_modifier import ( diff --git a/mobility/choice_models/destination_sequence_sampler.py b/mobility/choice_models/destination_sequence_sampler.py index d79cd97..afc7357 100644 --- a/mobility/choice_models/destination_sequence_sampler.py +++ b/mobility/choice_models/destination_sequence_sampler.py @@ -5,6 +5,7 @@ from scipy.stats import norm from mobility.choice_models.add_index import add_index +from mobility.simulation_profile import SimulationStep class DestinationSequenceSampler: """Samples destination sequences for trip chains. @@ -17,9 +18,9 @@ class DestinationSequenceSampler: def run( self, motives, + step: SimulationStep, transport_zones, remaining_sinks, - iteration, chains, demand_groups, costs, @@ -39,7 +40,6 @@ def run( transport_zones: Transport zone container used by motives. remaining_sinks (pl.DataFrame): Current sink state per (motive, to), including capacity and saturation utility penalty. - iteration (int): Iteration index (>=1). chains (pl.DataFrame): Chain steps with ["demand_group_id","motive_seq_id","motive","is_anchor","seq_step_index"]. demand_groups (pl.DataFrame): ["demand_group_id","home_zone_id"] (merged for origins). @@ -55,6 +55,7 @@ def run( utilities = self.get_utilities( motives, + step, transport_zones, remaining_sinks, costs, @@ -64,6 +65,7 @@ def run( dest_prob = self.get_destination_probability( utilities, motives, + step, parameters.dest_prob_cutoff ) @@ -103,13 +105,13 @@ def run( on=["demand_group_id", "motive_seq_id"] ) .drop(["home_zone_id", "motive"]) - .with_columns(iteration=pl.lit(iteration).cast(pl.UInt32)) + .with_columns(iteration=pl.lit(step.iteration).cast(pl.UInt32)) ) return chains - def get_utilities(self, motives, transport_zones, sinks, costs, cost_uncertainty_sd): + def get_utilities(self, motives, step: SimulationStep, transport_zones, sinks, costs, cost_uncertainty_sd): """Assemble per-(from,to,motive) utility with cost uncertainty. @@ -131,7 +133,7 @@ def get_utilities(self, motives, transport_zones, sinks, costs, cost_uncertainty - cost_bin_to_dest: ["motive","from","cost_bin","to","p_to"]. """ - utilities = [(m.name, m.get_utilities(transport_zones)) for m in motives] + utilities = [(m.name, m.get_utilities(transport_zones, parameters=m.get_parameters_at_step(step))) for m in motives] utilities = [u for u in utilities if u[1] is not None] utilities = [u[1].with_columns(motive=pl.lit(u[0])) for u in utilities] @@ -196,7 +198,7 @@ def offset_costs(costs, delta, prob): return costs_bin, cost_bin_to_dest - def get_destination_probability(self, utilities, motives, dest_prob_cutoff): + def get_destination_probability(self, utilities, motives, step: SimulationStep, dest_prob_cutoff): """Compute P(destination | from, motive) via a radiation-style model. @@ -220,7 +222,10 @@ def get_destination_probability(self, utilities, motives, dest_prob_cutoff): costs_bin = utilities[0] cost_bin_to_dest = utilities[1] - motives_lambda = {motive.name: motive.inputs["parameters"].radiation_lambda for motive in motives} + motives_lambda = { + motive.name: motive.get_parameters_at_step(step).radiation_lambda + for motive in motives + } prob = ( diff --git a/mobility/choice_models/population_trips.py b/mobility/choice_models/population_trips.py index a9b6cc8..b1d04ef 100644 --- a/mobility/choice_models/population_trips.py +++ b/mobility/choice_models/population_trips.py @@ -32,6 +32,7 @@ prune_tmp_artifacts, rehydrate_congestion_snapshot, ) +from mobility.simulation_profile import SimulationStep class PopulationTrips(FileAsset): """ @@ -183,8 +184,7 @@ def __init__( } super().__init__(inputs, cache_path) - - + def validate_motives(self, motives: List[Motive]) -> None: if not motives: @@ -303,7 +303,7 @@ def run_model(self, is_weekday: bool) -> Tuple[pl.DataFrame, pl.DataFrame, pl.Da str(is_weekday), str(resume_plan.resume_from_iter), ) - tmp_folders = self.prepare_tmp_folders(cache_path, resume=(resume_plan.resume_from_iter is not None)) + tmp_folders = self.get_tmp_folders(cache_path, resume=(resume_plan.resume_from_iter is not None)) chains_by_motive, chains, demand_groups = self.state_initializer.get_chains( population, @@ -317,11 +317,15 @@ def run_model(self, is_weekday: bool) -> Tuple[pl.DataFrame, pl.DataFrame, pl.Da chains_by_motive, demand_groups ) + + step = SimulationStep(iteration=1) + home_motive = [m for m in motives if m.name == "home"][0] stay_home_state, current_states = self.state_initializer.get_stay_home_state( demand_groups, home_night_dur, - motives, + home_motive, + step, parameters.min_activity_time_constant, ) @@ -374,13 +378,14 @@ def run_model(self, is_weekday: bool) -> Tuple[pl.DataFrame, pl.DataFrame, pl.Da logging.info(f"Iteration n°{iteration}") seed = self.rng.getrandbits(64) + step = SimulationStep(iteration=iteration) ( self.destination_sequence_sampler.run( motives, + step, population.transport_zones, remaining_sinks, - iteration, chains_by_motive, demand_groups, costs, @@ -409,12 +414,12 @@ def run_model(self, is_weekday: bool) -> Tuple[pl.DataFrame, pl.DataFrame, pl.Da costs_aggregator, remaining_sinks, motive_dur, - iteration, + step, tmp_folders, home_night_dur, stay_home_state, parameters, - motives + motives, ) transition_events_per_iter.append(transition_events) @@ -430,7 +435,8 @@ def run_model(self, is_weekday: bool) -> Tuple[pl.DataFrame, pl.DataFrame, pl.Da remaining_sinks = self.state_updater.get_new_sinks( current_states_steps, sinks, - motives + motives, + step, ) # Save per-iteration checkpoint after all state has been advanced. @@ -449,6 +455,7 @@ def run_model(self, is_weekday: bool) -> Tuple[pl.DataFrame, pl.DataFrame, pl.Da # If we resumed after completing all iterations (or start_iteration > n_iterations), # rebuild step-level flows from cached artifacts for final output. if "current_states_steps" not in locals(): + step = SimulationStep(iteration=parameters.n_iterations) possible_states_steps = self.state_updater.get_possible_states_steps( current_states, demand_groups, @@ -456,7 +463,7 @@ def run_model(self, is_weekday: bool) -> Tuple[pl.DataFrame, pl.DataFrame, pl.Da costs_aggregator, remaining_sinks, motive_dur, - parameters.n_iterations, + step, motives, parameters.min_activity_time_constant, tmp_folders @@ -519,11 +526,12 @@ def remove(self, remove_checkpoints: bool = True): logging.info("Removed %s checkpoint files for run_key=%s", str(removed), str(run_key)) - def prepare_tmp_folders(self, cache_path, resume: bool = False): - """Create per-run temp folders next to the cache path. + def get_tmp_folders(self, cache_path: pathlib.Path, resume: bool = False) -> dict[str, pathlib.Path]: + """Return per-run temp folders next to the cache path. Args: cache_path (pathlib.Path): Target cache file used to derive temp roots. + resume (bool): When False, clears existing temp folders before reuse. Returns: dict[str, pathlib.Path]: Mapping of temp folder names to paths. diff --git a/mobility/choice_models/state_initializer.py b/mobility/choice_models/state_initializer.py index 9f12768..9dd541f 100644 --- a/mobility/choice_models/state_initializer.py +++ b/mobility/choice_models/state_initializer.py @@ -1,6 +1,9 @@ import math import polars as pl +from mobility.motives.motive import Motive +from mobility.simulation_profile import SimulationStep + class StateInitializer: """Builds initial chain demand, averages, and capacities for the model. @@ -272,7 +275,8 @@ def get_stay_home_state( self, demand_groups, home_night_dur, - motives, + home_motive: Motive, + step: SimulationStep, min_activity_time_constant: float, ): @@ -295,8 +299,8 @@ def get_stay_home_state( - current_states: A clone of `stay_home_state` for iteration start. """ - home_motive = [m for m in motives if m.name == "home"][0] - + value_of_time_stay_home = home_motive.get_parameters_at_step(step).value_of_time_stay_home + stay_home_state = ( demand_groups.select(["demand_group_id", "csp", "n_persons"]) @@ -309,13 +313,23 @@ def get_stay_home_state( .join(home_night_dur, on="csp") .with_columns( utility=( - home_motive.inputs["parameters"].value_of_time_stay_home + value_of_time_stay_home * pl.col("mean_home_night_per_pers") * (pl.col("mean_home_night_per_pers")/pl.col("mean_home_night_per_pers")/math.exp(-min_activity_time_constant)).log().clip(0.0) ) ) - .select(["demand_group_id", "iteration", "motive_seq_id", "mode_seq_id", "dest_seq_id", "utility", "n_persons"]) + .select([ + "demand_group_id", + "csp", + "mean_home_night_per_pers", + "iteration", + "motive_seq_id", + "mode_seq_id", + "dest_seq_id", + "utility", + "n_persons", + ]) ) current_states = ( diff --git a/mobility/choice_models/state_updater.py b/mobility/choice_models/state_updater.py index fcd30ab..f1de9c5 100644 --- a/mobility/choice_models/state_updater.py +++ b/mobility/choice_models/state_updater.py @@ -5,6 +5,7 @@ import polars as pl from mobility.choice_models.transition_schema import TRANSITION_EVENT_COLUMNS +from mobility.simulation_profile import SimulationStep class StateUpdater: """Updates population state distributions over motive/destination/mode sequences. @@ -22,12 +23,12 @@ def get_new_states( costs_aggregator: Any, remaining_sinks: pl.DataFrame, motive_dur: pl.DataFrame, - iteration: int, + step: SimulationStep, tmp_folders: dict[str, Any], home_night_dur: pl.DataFrame, stay_home_state: pl.DataFrame, parameters: Any, - motives: list[Any] + motives: list[Any], ) -> tuple[pl.DataFrame, pl.DataFrame, pl.DataFrame]: """Advance one iteration of state updates. @@ -44,7 +45,6 @@ def get_new_states( remaining_sinks (pl.DataFrame): Sink state per (motive,to) with capacity and saturation utility penalty. motive_dur (pl.DataFrame): Mean activity durations by (csp,motive). - iteration (int): Current iteration (1-based). tmp_folders (dict[str, pathlib.Path]): Paths to “spatialized-chains” and “modes”. home_night_dur (pl.DataFrame): Mean remaining home-night duration by csp. stay_home_state (pl.DataFrame): Baseline “stay-home” state rows. @@ -62,7 +62,7 @@ def get_new_states( costs_aggregator, remaining_sinks, motive_dur, - iteration, + step, motives, parameters.min_activity_time_constant, tmp_folders @@ -70,21 +70,21 @@ def get_new_states( self._assert_current_states_covered_by_possible_steps( current_states, possible_states_steps, - iteration + step.iteration ) home_motive = [m for m in motives if m.name == "home"][0] - + possible_states_utility = self.get_possible_states_utility( possible_states_steps, home_night_dur, - home_motive.inputs["parameters"].value_of_time_stay_home, + home_motive.get_parameters_at_step(step).value_of_time_stay_home, stay_home_state, parameters.min_activity_time_constant ) transition_prob = self.get_transition_probabilities(current_states, possible_states_utility) - current_states, transition_events = self.apply_transitions(current_states, transition_prob, iteration) + current_states, transition_events = self.apply_transitions(current_states, transition_prob, step.iteration) transition_events = self.add_transition_state_details(transition_events, possible_states_steps) current_states_steps = self.get_current_states_steps(current_states, possible_states_steps) @@ -141,7 +141,7 @@ def get_possible_states_steps( costs_aggregator, sinks, motive_dur, - iteration, + step, motives, min_activity_time_constant, tmp_folders @@ -159,7 +159,6 @@ def get_possible_states_steps( costs_aggregator (TravelCostsAggregator): Per-mode OD costs. sinks (pl.DataFrame): Sink state per (motive,to). motive_dur (pl.DataFrame): Mean durations per (csp,motive). - iteration (int): Current iteration to pick latest artifacts. activity_utility_coeff (float): Coefficient for activity utility. tmp_folders (dict[str, pathlib.Path]): Must contain "spatialized-chains" and "modes". @@ -209,16 +208,24 @@ def get_possible_states_steps( ) + motive_parameters_at_step = { + motive.name: motive.get_parameters_at_step(step) + for motive in motives + } + # Get the activities values of time value_of_time = ( pl.from_dicts( - [{"motive": m.name, "value_of_time": m.inputs["parameters"].value_of_time} for m in motives] + [ + {"motive": motive_name, "value_of_time": motive_parameters.value_of_time} + for motive_name, motive_parameters in motive_parameters_at_step.items() + ] ) .with_columns( motive=pl.col("motive").cast(pl.Enum(motive_dur["motive"].dtype.categories)) ) ) - + possible_states_steps = ( modes @@ -309,6 +316,16 @@ def get_possible_states_utility( possible_states_utility, ( stay_home_state.lazy() + .with_columns( + min_activity_time=pl.col("mean_home_night_per_pers")*math.exp(-min_activity_time_constant) + ) + .with_columns( + utility=( + value_of_time_stay_home + * pl.col("mean_home_night_per_pers") + * (pl.col("mean_home_night_per_pers")/pl.col("min_activity_time")).log().clip(0.0) + ) + ) .select(["demand_group_id", "motive_seq_id", "mode_seq_id", "dest_seq_id", "utility"]) ) ]) @@ -791,7 +808,8 @@ def get_new_sinks( self, current_states_steps, sinks, - motives + motives, + step: SimulationStep, ): """Recompute remaining opportunities per (motive, destination). @@ -809,15 +827,20 @@ def get_new_sinks( logging.info("Computing remaining opportunities at destinations...") + motive_parameters_at_step = { + motive.name: motive.get_parameters_at_step(step) + for motive in motives + } + saturation_fun_parameters = ( pl.from_dicts( [ { - "motive": m.name, - "beta": m.inputs["parameters"].saturation_fun_beta, - "ref_level": m.inputs["parameters"].saturation_fun_ref_level + "motive": motive_name, + "beta": motive_parameters.saturation_fun_beta, + "ref_level": motive_parameters.saturation_fun_ref_level } - for m in motives + for motive_name, motive_parameters in motive_parameters_at_step.items() ] ) .with_columns( diff --git a/mobility/motives/home.py b/mobility/motives/home.py index 54fcdd9..6e2b876 100644 --- a/mobility/motives/home.py +++ b/mobility/motives/home.py @@ -4,6 +4,8 @@ from pydantic import Field from mobility.motives.motive import Motive, MotiveParameters +from mobility.simulation_profile import ParameterProfile +from mobility.validation_types import NonNegativeFloat class HomeMotive(Motive): @@ -46,13 +48,13 @@ class HomeMotiveParameters(MotiveParameters): """Parameters specific to the home motive.""" value_of_time: Annotated[ - float, - Field(default=10.0, ge=0.0), + float | ParameterProfile, + Field(default=10.0), ] value_of_time_stay_home: Annotated[ - float, - Field(default=0.0, ge=0.0), + float | ParameterProfile, + Field(default=0.0), ] saturation_fun_ref_level: Annotated[ diff --git a/mobility/motives/leisure.py b/mobility/motives/leisure.py index b2b4ae6..99500e4 100644 --- a/mobility/motives/leisure.py +++ b/mobility/motives/leisure.py @@ -14,6 +14,8 @@ from pydantic import Field from mobility.motives.motive import Motive, MotiveParameters +from mobility.simulation_profile import ParameterProfile +from mobility.validation_types import NonNegativeFloat, UnitIntervalFloat class LeisureMotive(Motive): @@ -157,11 +159,11 @@ def plot_opportunities_map( class LeisureMotiveParameters(MotiveParameters): """Parameters specific to the leisure motive.""" - value_of_time: Annotated[float, Field(default=10.0, ge=0.0)] + value_of_time: Annotated[float | ParameterProfile, Field(default=10.0)] saturation_fun_ref_level: Annotated[float, Field(default=1.5, ge=0.0)] saturation_fun_beta: Annotated[float, Field(default=4.0, ge=0.0)] survey_ids: Annotated[ list[str], Field(default_factory=lambda: ["7.71", "7.72", "7.73", "7.74", "7.75", "7.76", "7.77", "7.78"]), ] - radiation_lambda: Annotated[float, Field(default=0.99986, ge=0.0, le=1.0)] + radiation_lambda: Annotated[UnitIntervalFloat, Field(default=0.99986)] diff --git a/mobility/motives/motive.py b/mobility/motives/motive.py index 473f435..c336cbf 100644 --- a/mobility/motives/motive.py +++ b/mobility/motives/motive.py @@ -7,6 +7,8 @@ from mobility.in_memory_asset import InMemoryAsset from pydantic import BaseModel, ConfigDict, Field +from mobility.simulation_profile import ParameterProfile, SimulationStep, resolve_model_for_step +from mobility.validation_types import NonNegativeFloat, UnitIntervalFloat class Motive(InMemoryAsset): @@ -61,17 +63,33 @@ def __init__( super().__init__(inputs) - def get_utilities(self, transport_zones): + def get_parameters_at_step(self, step: SimulationStep) -> "MotiveParameters": + """Returns the motive parameters in effect at a simulation step. + + Args: + step: Simulation step used to evaluate step-varying parameter + profiles. + + Returns: + MotiveParameters: Parameter model with all step-varying fields + resolved to scalar values for ``step``. + """ + return resolve_model_for_step(self.inputs["parameters"], step) + + + def get_utilities(self, transport_zones, parameters: "MotiveParameters" | None = None): + + parameters = parameters or self.inputs["parameters"] if self.utilities is not None: utilities = self.utilities - elif self.inputs["parameters"].country_utilities is not None: + elif parameters.country_utilities is not None: transport_zones = transport_zones.get().drop("geometry", axis=1) transport_zones["country"] = transport_zones["local_admin_unit_id"].str[0:2] - transport_zones["utility"] = transport_zones["country"].map(self.inputs["parameters"].country_utilities) + transport_zones["utility"] = transport_zones["country"].map(parameters.country_utilities) utilities = pl.from_pandas( transport_zones[["transport_zone_id", "utility"]] @@ -108,40 +126,36 @@ class MotiveParameters(BaseModel): model_config = ConfigDict(extra="forbid") value_of_time: Annotated[ - float, + NonNegativeFloat | ParameterProfile, Field( default=10.0, - ge=0.0, title="Value of time", description="Utility weight for time spent traveling or on activities.", ), ] saturation_fun_ref_level: Annotated[ - float, + NonNegativeFloat, Field( default=1.5, - ge=0.0, title="Saturation reference level", description="Reference level used by the sink saturation utility function.", ), ] saturation_fun_beta: Annotated[ - float, + NonNegativeFloat, Field( default=4.0, - ge=0.0, title="Saturation beta", description="Shape parameter of the sink saturation utility function.", ), ] value_of_time_v2: Annotated[ - float | None, + NonNegativeFloat | ParameterProfile | None, Field( default=None, - ge=0.0, title="Alternative value of time", description="Optional alternative value of time for second utility formulation.", ), @@ -157,11 +171,9 @@ class MotiveParameters(BaseModel): ] radiation_lambda: Annotated[ - float | None, + UnitIntervalFloat | None, Field( default=None, - ge=0.0, - le=1.0, title="Radiation model lambda", description="Radiation-model parameter controlling destination choice dispersion.", ), @@ -177,11 +189,10 @@ class MotiveParameters(BaseModel): ] sink_saturation_coeff: Annotated[ - float, + NonNegativeFloat, Field( default=1.0, - ge=0.0, title="Sink saturation coefficient", description="Coefficient scaling sink saturation in activity utility.", ), - ] \ No newline at end of file + ] diff --git a/mobility/motives/other.py b/mobility/motives/other.py index 0cbb7d6..a1e0642 100644 --- a/mobility/motives/other.py +++ b/mobility/motives/other.py @@ -7,6 +7,8 @@ from pydantic import Field from mobility.motives.motive import Motive, MotiveParameters from mobility.population import Population +from mobility.simulation_profile import ParameterProfile +from mobility.validation_types import NonNegativeFloat, UnitIntervalFloat class OtherMotive(Motive): @@ -75,8 +77,8 @@ class OtherMotiveParameters(MotiveParameters): """Parameters specific to the other motive.""" value_of_time: Annotated[ - float, - Field(default=10.0, ge=0.0), + float | ParameterProfile, + Field(default=10.0), ] saturation_fun_ref_level: Annotated[ @@ -90,6 +92,6 @@ class OtherMotiveParameters(MotiveParameters): ] radiation_lambda: Annotated[ - float, - Field(default=0.99986, ge=0.0, le=1.0), + UnitIntervalFloat, + Field(default=0.99986), ] diff --git a/mobility/motives/shopping.py b/mobility/motives/shopping.py index c9884d0..d7fa703 100644 --- a/mobility/motives/shopping.py +++ b/mobility/motives/shopping.py @@ -8,6 +8,8 @@ from pydantic import Field from mobility.motives.motive import Motive, MotiveParameters from mobility.parsers.shops_turnover_distribution import ShopsTurnoverDistribution +from mobility.simulation_profile import ParameterProfile +from mobility.validation_types import NonNegativeFloat, UnitIntervalFloat class ShoppingMotive(Motive): @@ -79,8 +81,8 @@ def get_opportunities(self, transport_zones): class ShoppingMotiveParameters(MotiveParameters): """Parameters specific to the shopping motive.""" - value_of_time: Annotated[float, Field(default=10.0, ge=0.0)] + value_of_time: Annotated[float | ParameterProfile, Field(default=10.0)] saturation_fun_ref_level: Annotated[float, Field(default=1.5, ge=0.0)] saturation_fun_beta: Annotated[float, Field(default=4.0, ge=0.0)] survey_ids: Annotated[list[str], Field(default_factory=lambda: ["2.20", "2.21"])] - radiation_lambda: Annotated[float, Field(default=0.99986, ge=0.0, le=1.0)] \ No newline at end of file + radiation_lambda: Annotated[UnitIntervalFloat, Field(default=0.99986)] diff --git a/mobility/motives/studies.py b/mobility/motives/studies.py index 068a460..5b9ca3e 100644 --- a/mobility/motives/studies.py +++ b/mobility/motives/studies.py @@ -10,6 +10,8 @@ from pydantic import Field from mobility.motives.motive import Motive, MotiveParameters from mobility.parsers.schools_capacity_distribution import SchoolsCapacityDistribution +from mobility.simulation_profile import ParameterProfile +from mobility.validation_types import NonNegativeFloat, UnitIntervalFloat class StudiesMotive(Motive): @@ -157,8 +159,8 @@ def plot_opportunities_map( class StudiesMotiveParameters(MotiveParameters): """Parameters specific to the studies motive.""" - value_of_time: Annotated[float, Field(default=10.0, ge=0.0)] + value_of_time: Annotated[float | ParameterProfile, Field(default=10.0)] saturation_fun_ref_level: Annotated[float, Field(default=1.5, ge=0.0)] saturation_fun_beta: Annotated[float, Field(default=4.0, ge=0.0)] survey_ids: Annotated[list[str], Field(default_factory=lambda: ["1.11"])] - radiation_lambda: Annotated[float, Field(default=0.99986, ge=0.0, le=1.0)] + radiation_lambda: Annotated[UnitIntervalFloat, Field(default=0.99986)] diff --git a/mobility/motives/work.py b/mobility/motives/work.py index d9d8761..680ac9e 100644 --- a/mobility/motives/work.py +++ b/mobility/motives/work.py @@ -8,6 +8,8 @@ from pydantic import Field from mobility.motives.motive import Motive, MotiveParameters from mobility.parsers import JobsActivePopulationDistribution +from mobility.simulation_profile import ParameterProfile +from mobility.validation_types import NonNegativeFloat, UnitIntervalFloat class WorkMotive(Motive): @@ -86,8 +88,8 @@ class WorkMotiveParameters(MotiveParameters): """Parameters specific to the work motive.""" value_of_time: Annotated[ - float, - Field(default=10.0, ge=0.0), + float | ParameterProfile, + Field(default=10.0), ] saturation_fun_ref_level: Annotated[ @@ -106,11 +108,11 @@ class WorkMotiveParameters(MotiveParameters): ] radiation_lambda: Annotated[ - float, - Field(default=0.99986, ge=0.0, le=1.0), + UnitIntervalFloat, + Field(default=0.99986), ] country_utilities: Annotated[ dict[str, float], Field(default_factory=lambda: {"fr": 0.0, "ch": 5.0}), - ] \ No newline at end of file + ] diff --git a/mobility/simulation_profile.py b/mobility/simulation_profile.py new file mode 100644 index 0000000..1f4eed8 --- /dev/null +++ b/mobility/simulation_profile.py @@ -0,0 +1,130 @@ +from __future__ import annotations + +from typing import Any, Literal, TypeVar + +import numpy as np +from pydantic import BaseModel, ConfigDict, Field, model_validator + +T = TypeVar("T", bound=BaseModel) + + +class SimulationStep(BaseModel): + """Identifies one simulation step. + + Attributes: + iteration: One-based simulation iteration index. + """ + + model_config = ConfigDict(extra="forbid") + + iteration: int = Field(ge=1) + + +class ParameterProfile(BaseModel): + """Defines a parameter value profile over simulation iterations. + + The profile is specified by control points keyed by iteration index. + Values can be evaluated with step-wise or linear interpolation. + + Attributes: + mode: Evaluation mode used between control points. + points: Mapping from iteration index to parameter value. + """ + + model_config = ConfigDict(extra="forbid") + + mode: Literal["step", "linear"] = "step" + points: dict[int, float] + + @model_validator(mode="after") + def validate_points(self) -> "ParameterProfile": + """Validates control points after model initialization. + + Returns: + ParameterProfile: The validated profile instance. + + Raises: + ValueError: If no control point is provided or if an iteration + index is lower than 1. + """ + if not self.points: + raise ValueError("ParameterProfile.points must not be empty.") + + invalid_steps = [step for step in self.points if step < 1] + if invalid_steps: + raise ValueError("ParameterProfile.points keys must be >= 1.") + + return self + + def at(self, step: SimulationStep) -> float: + """Evaluates the profile at a simulation step. + + Args: + step: Simulation step at which to evaluate the profile. + + Returns: + float: Parameter value at the requested step. + """ + sorted_points = sorted(self.points.items()) + iterations = np.array([iteration for iteration, _ in sorted_points], dtype=float) + values = np.array([value for _, value in sorted_points], dtype=float) + + if self.mode == "step": + idx = np.searchsorted(iterations, step.iteration, side="right") - 1 + idx = max(idx, 0) + return float(values[idx]) + + return float(np.interp(step.iteration, iterations, values)) + +def resolve_value_for_step(value: Any, step: SimulationStep) -> Any: + """Resolves one value for a simulation step. + + Supported inputs are: + - ``ParameterProfile`` instances, which are evaluated at ``step`` + - pydantic models, which are resolved field by field + - ``dict``/``list``/``tuple`` containers, which are resolved recursively + - plain leaf values, which are returned unchanged + + Args: + value: Value to resolve. + step: Simulation step used for evaluation. + + Returns: + Any: Resolved value for the requested step. + """ + + if isinstance(value, ParameterProfile): + return value.at(step) + + if isinstance(value, BaseModel): + return resolve_model_for_step(value, step) + + if isinstance(value, dict): + return {key: resolve_value_for_step(item, step) for key, item in value.items()} + + if isinstance(value, list): + return [resolve_value_for_step(item, step) for item in value] + + if isinstance(value, tuple): + return tuple(resolve_value_for_step(item, step) for item in value) + + return value + + +def resolve_model_for_step(model: T, step: SimulationStep) -> T: + """Resolves step-varying fields of a pydantic model. + + Args: + model: Pydantic model whose fields may contain step-varying values. + step: Simulation step used for evaluation. + + Returns: + T: New validated model instance with step-varying fields replaced by + their scalar values at ``step``. + """ + + resolved_data = { + field_name: resolve_value_for_step(getattr(model, field_name), step) + for field_name in model.__class__.model_fields + } + return model.__class__.model_validate(resolved_data) diff --git a/mobility/validation_types.py b/mobility/validation_types.py new file mode 100644 index 0000000..f912a9c --- /dev/null +++ b/mobility/validation_types.py @@ -0,0 +1,6 @@ +from typing import Annotated + +from pydantic import Field + +NonNegativeFloat = Annotated[float, Field(ge=0.0)] +UnitIntervalFloat = Annotated[float, Field(ge=0.0, le=1.0)]