diff --git a/mobility/__init__.py b/mobility/__init__.py index 7ebe037..6553068 100644 --- a/mobility/__init__.py +++ b/mobility/__init__.py @@ -45,7 +45,7 @@ from mobility.choice_models.population_trips import PopulationTrips -from mobility.choice_models.population_trips_parameters import PopulationTripsParameters +from mobility.choice_models.population_trips_parameters import PopulationTripsParameters, BehaviorChangePhase, BehaviorChangeScope from mobility.transport_graphs.speed_modifier import ( diff --git a/mobility/choice_models/population_trips.py b/mobility/choice_models/population_trips.py index a9b6cc8..eff7fdf 100644 --- a/mobility/choice_models/population_trips.py +++ b/mobility/choice_models/population_trips.py @@ -14,7 +14,13 @@ from mobility.file_asset import FileAsset from mobility.population import Population from mobility.choice_models.travel_costs_aggregator import TravelCostsAggregator -from mobility.choice_models.population_trips_parameters import PopulationTripsParameters +from mobility.choice_models.population_trips_parameters import ( + PopulationTripsParameters, +) +from mobility.choice_models.population_trips_candidates import ( + get_mode_sequences, + get_spatialized_chains, +) from mobility.choice_models.destination_sequence_sampler import DestinationSequenceSampler from mobility.choice_models.top_k_mode_sequence_search import TopKModeSequenceSearch from mobility.choice_models.state_initializer import StateInitializer @@ -375,32 +381,36 @@ def run_model(self, is_weekday: bool) -> Tuple[pl.DataFrame, pl.DataFrame, pl.Da seed = self.rng.getrandbits(64) - ( - self.destination_sequence_sampler.run( - motives, - population.transport_zones, - remaining_sinks, - iteration, - chains_by_motive, - demand_groups, - costs, - tmp_folders, - parameters, - seed - ) - .write_parquet(tmp_folders["spatialized-chains"] / f"spatialized_chains_{iteration}.parquet") + behavior_change_scope = parameters.get_behavior_change_scope(iteration) + + spatialized_chains = get_spatialized_chains( + behavior_change_scope=behavior_change_scope, + current_states=current_states, + destination_sequence_sampler=self.destination_sequence_sampler, + motives=motives, + transport_zones=population.transport_zones, + remaining_sinks=remaining_sinks, + iteration=iteration, + chains_by_motive=chains_by_motive, + demand_groups=demand_groups, + costs=costs, + tmp_folders=tmp_folders, + parameters=parameters, + seed=seed, ) - - - ( - self.top_k_mode_sequence_search.run( - iteration, - costs_aggregator, - tmp_folders, - parameters - ) - .write_parquet(tmp_folders["modes"] / f"mode_sequences_{iteration}.parquet") + spatialized_chains.write_parquet( + tmp_folders["spatialized-chains"] / f"spatialized_chains_{iteration}.parquet" + ) + + mode_sequences = get_mode_sequences( + spatialized_chains=spatialized_chains, + top_k_mode_sequence_search=self.top_k_mode_sequence_search, + iteration=iteration, + costs_aggregator=costs_aggregator, + tmp_folders=tmp_folders, + parameters=parameters, ) + mode_sequences.write_parquet(tmp_folders["modes"] / f"mode_sequences_{iteration}.parquet") current_states, current_states_steps, transition_events = self.state_updater.get_new_states( current_states, @@ -459,7 +469,8 @@ def run_model(self, is_weekday: bool) -> Tuple[pl.DataFrame, pl.DataFrame, pl.Da parameters.n_iterations, motives, parameters.min_activity_time_constant, - tmp_folders + tmp_folders, + parameters.get_behavior_change_scope(parameters.n_iterations), ) current_states_steps = self.state_updater.get_current_states_steps(current_states, possible_states_steps) diff --git a/mobility/choice_models/population_trips_candidates.py b/mobility/choice_models/population_trips_candidates.py new file mode 100644 index 0000000..4142ddd --- /dev/null +++ b/mobility/choice_models/population_trips_candidates.py @@ -0,0 +1,283 @@ +import pathlib + +import polars as pl + +from mobility.choice_models.population_trips_parameters import ( + BehaviorChangeScope, + PopulationTripsParameters, +) + + +def get_spatialized_chains( + behavior_change_scope: BehaviorChangeScope, + current_states: pl.DataFrame, + destination_sequence_sampler, + motives, + transport_zones, + remaining_sinks: pl.DataFrame, + iteration: int, + chains_by_motive: pl.DataFrame, + demand_groups: pl.DataFrame, + costs: pl.DataFrame, + tmp_folders: dict[str, pathlib.Path], + parameters: PopulationTripsParameters, + seed: int, + ) -> pl.DataFrame: + """Get spatialized chains for the current simulation step. + + Args: + behavior_change_scope: Active behavior-change scope for the step. + current_states: Aggregate states occupied before the step update. + destination_sequence_sampler: Sampler used when destination resampling + is allowed. + motives: Available motives for the simulation. + transport_zones: Transport zones used to spatialize destinations. + remaining_sinks: Remaining destination capacities. + iteration: Current simulation iteration number. + chains_by_motive: Full chain templates indexed by motive sequence. + demand_groups: Demand-group metadata used during spatialization. + costs: Current OD costs used by destination sampling. + tmp_folders: Temporary folders for intermediate iteration artifacts. + parameters: PopulationTrips parameters. + seed: RNG seed used for destination sampling. + + Returns: + Spatialized chains to use for the current step. + """ + if behavior_change_scope == BehaviorChangeScope.FULL_REPLANNING: + chains_to_sample = chains_by_motive + elif behavior_change_scope == BehaviorChangeScope.DESTINATION_REPLANNING: + chains_to_sample = get_active_motive_chains( + chains_by_motive=chains_by_motive, + current_states=current_states, + ) + elif behavior_change_scope == BehaviorChangeScope.MODE_REPLANNING: + return get_active_destination_sequences( + current_states=current_states, + iteration=iteration, + tmp_folders=tmp_folders, + ) + else: + raise ValueError(f"Unsupported behavior change scope: {behavior_change_scope}") + + if chains_to_sample.height == 0: + if get_active_non_stay_home_states(current_states).height > 0: + raise ValueError( + "No chains available for active non-stay-home states at " + f"iteration={iteration} with behavior_change_scope={behavior_change_scope.value}." + ) + return empty_spatialized_chains() + + return destination_sequence_sampler.run( + motives, + transport_zones, + remaining_sinks, + iteration, + chains_to_sample, + demand_groups, + costs, + tmp_folders, + parameters, + seed, + ) + + +def get_mode_sequences( + spatialized_chains: pl.DataFrame, + top_k_mode_sequence_search, + iteration: int, + costs_aggregator, + tmp_folders: dict[str, pathlib.Path], + parameters: PopulationTripsParameters, + ) -> pl.DataFrame: + """Get mode sequences for the current simulation step. + + Args: + spatialized_chains: Spatialized chains selected for the current step. + top_k_mode_sequence_search: Searcher that computes top-k mode + sequences for each spatialized chain. + iteration: Current simulation iteration number. + costs_aggregator: Provides OD costs by transport mode. + tmp_folders: Temporary folders for intermediate iteration artifacts. + parameters: PopulationTrips parameters. + + Returns: + Mode sequences to use for the current step. + """ + if spatialized_chains.height == 0: + return empty_mode_sequences() + + return top_k_mode_sequence_search.run( + iteration, + costs_aggregator, + tmp_folders, + parameters, + ) + + +def get_active_motive_chains( + chains_by_motive: pl.DataFrame, + current_states: pl.DataFrame, + ) -> pl.DataFrame: + """Keep chain templates for motive sequences currently selected. + + Args: + chains_by_motive: Full chain-template table. + current_states: Aggregate states occupied before the step update. + + Returns: + Chain templates restricted to active non-stay-home motive sequences. + """ + active_motive_sequences = get_active_non_stay_home_states(current_states).select( + ["demand_group_id", "motive_seq_id"] + ).unique() + + if active_motive_sequences.height == 0: + return chains_by_motive.head(0) + + active_motive_sequences = active_motive_sequences.with_columns( + demand_group_id=pl.col("demand_group_id").cast(chains_by_motive.schema["demand_group_id"]), + motive_seq_id=pl.col("motive_seq_id").cast(chains_by_motive.schema["motive_seq_id"]), + ) + + return chains_by_motive.join( + active_motive_sequences, + on=["demand_group_id", "motive_seq_id"], + how="inner", + ) + + +def get_active_destination_sequences( + current_states: pl.DataFrame, + iteration: int, + tmp_folders: dict[str, pathlib.Path], + ) -> pl.DataFrame: + """Reuse active destination sequences and tag them for a new iteration. + + Args: + current_states: Aggregate states occupied before the step update. + iteration: Current simulation iteration number. + tmp_folders: Temporary folders containing prior spatialized chains. + + Returns: + Spatialized chains matching the active non-stay-home destination + sequences, restamped with the current iteration. + """ + active_dest_sequences = get_active_non_stay_home_states(current_states).select( + ["demand_group_id", "motive_seq_id", "dest_seq_id"] + ).unique() + + if active_dest_sequences.height == 0: + return empty_spatialized_chains() + + available_chains = get_latest_spatialized_chains(tmp_folders) + if available_chains is None: + raise ValueError( + "No prior spatialized chains available for active non-stay-home " + f"states at iteration={iteration}." + ) + + active_dest_sequences = active_dest_sequences.with_columns( + demand_group_id=pl.col("demand_group_id").cast(pl.UInt32), + motive_seq_id=pl.col("motive_seq_id").cast(pl.UInt32), + dest_seq_id=pl.col("dest_seq_id").cast(pl.UInt32), + ) + + reused = ( + available_chains + .join( + active_dest_sequences.lazy(), + on=["demand_group_id", "motive_seq_id", "dest_seq_id"], + how="inner", + ) + .with_columns(iteration=pl.lit(iteration).cast(pl.UInt32())) + .collect(engine="streaming") + ) + + if reused.height == 0: + raise ValueError( + "Active non-stay-home states could not be matched to reusable " + f"destination chains at iteration={iteration}." + ) + + return reused + + +def get_active_non_stay_home_states(current_states: pl.DataFrame) -> pl.DataFrame: + """Get active non-stay-home states from the current aggregate state table. + + Args: + current_states: Aggregate states occupied before the step update. + + Returns: + Distinct active non-stay-home state keys. + """ + return ( + current_states + .filter(pl.col("motive_seq_id") != 0) + .select(["demand_group_id", "motive_seq_id", "dest_seq_id", "mode_seq_id"]) + .unique() + ) + + +def get_latest_spatialized_chains(tmp_folders: dict[str, pathlib.Path]) -> pl.LazyFrame | None: + """Load the latest available spatialized chains across prior iterations. + + Args: + tmp_folders: Temporary folders containing spatialized-chain parquet + files. + + Returns: + A lazy frame containing the most recent row for each state-step key, or + ``None`` if no spatialized chains have been saved yet. + """ + if not any(tmp_folders["spatialized-chains"].glob("spatialized_chains_*.parquet")): + return None + + pattern = tmp_folders["spatialized-chains"] / "spatialized_chains_*.parquet" + return ( + pl.scan_parquet(str(pattern)) + .sort("iteration", descending=True) + .unique( + subset=["demand_group_id", "motive_seq_id", "dest_seq_id", "seq_step_index"], + keep="first", + ) + ) + + +def empty_spatialized_chains() -> pl.DataFrame: + """Create an empty spatialized-chains table with the expected schema. + + Returns: + Empty spatialized chains. + """ + return pl.DataFrame( + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "seq_step_index": pl.UInt32, + "from": pl.Int32, + "to": pl.Int32, + "iteration": pl.UInt32, + } + ) + + +def empty_mode_sequences() -> pl.DataFrame: + """Create an empty mode-sequences table with the expected schema. + + Returns: + Empty mode sequences. + """ + return pl.DataFrame( + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "mode_seq_id": pl.UInt32, + "seq_step_index": pl.UInt32, + "mode": pl.Utf8, + "iteration": pl.UInt32, + } + ) diff --git a/mobility/choice_models/population_trips_parameters.py b/mobility/choice_models/population_trips_parameters.py index a9a377d..fcfd2fc 100644 --- a/mobility/choice_models/population_trips_parameters.py +++ b/mobility/choice_models/population_trips_parameters.py @@ -1,6 +1,64 @@ -from pydantic import BaseModel, Field, ConfigDict +from enum import Enum from typing import Annotated +from pydantic import BaseModel, ConfigDict, Field, model_validator + + +class BehaviorChangeScope(str, Enum): + """Highest adaptation layer allowed during one behavior-change phase. + + Attributes: + FULL_REPLANNING: Resample motive sequences, then dependent destination + and mode sequences. Stay-home transitions remain available. + DESTINATION_REPLANNING: Keep each currently occupied non-stay-home + motive sequence fixed and resample destination sequences plus + dependent mode sequences. Stay-home is frozen. + MODE_REPLANNING: Keep each currently occupied non-stay-home motive and + destination sequence fixed and resample mode sequences only. + Stay-home is frozen. + """ + + FULL_REPLANNING = "full_replanning" + DESTINATION_REPLANNING = "destination_replanning" + MODE_REPLANNING = "mode_replanning" + + +class BehaviorChangePhase(BaseModel): + """Behavior-change phase applied from ``start_iteration`` onward. + + Attributes: + start_iteration: First simulation iteration where this phase applies. + scope: Highest adaptation layer allowed during the phase. + """ + + model_config = ConfigDict(extra="forbid") + + start_iteration: Annotated[ + int, + Field( + ge=1, + title="Start iteration", + description="First simulation iteration where this behavior-change phase applies.", + ), + ] + + scope: Annotated[ + BehaviorChangeScope, + Field( + title="Behavior change scope", + description=( + "Highest adaptation layer allowed during the phase. " + "`full_replanning` resamples motive, destination, and mode " + "sequences. `destination_replanning` keeps each currently " + "occupied non-stay-home motive sequence fixed and resamples " + "destination plus mode sequences. `mode_replanning` keeps each " + "currently occupied non-stay-home motive and destination " + "sequence fixed and resamples mode sequences only. " + "Stay-home is frozen in restricted phases." + ), + ), + ] + class PopulationTripsParameters(BaseModel): @@ -146,3 +204,67 @@ class PopulationTripsParameters(BaseModel): description="Wether to simulate a weekend day or only a week day.", ), ] + + behavior_change_phases: Annotated[ + list[BehaviorChangePhase] | None, + Field( + default=None, + title="Behavior change phases", + description=( + "Optional per-iteration adaptation policy. Each phase starts at a " + "given iteration and selects the highest state layer that may " + "adapt: `mode_replanning`, `destination_replanning`, or " + "`full_replanning`. Restricted phases apply to currently " + "occupied non-stay-home states and freeze stay-home. If omitted, " + "all iterations use `full_replanning`." + ), + ), + ] + + @model_validator(mode="after") + def validate_behavior_change_phases(self) -> "PopulationTripsParameters": + """Ensure phase definitions are sorted and non-overlapping. + + Returns: + The validated parameter object. + + Raises: + ValueError: If behavior-change phases are not sorted by + ``start_iteration`` or if two phases start on the same + iteration. + """ + if self.behavior_change_phases is None: + return self + + start_iterations = [phase.start_iteration for phase in self.behavior_change_phases] + if start_iterations != sorted(start_iterations): + raise ValueError("PopulationTripsParameters.behavior_change_phases must be sorted by start_iteration.") + + if len(start_iterations) != len(set(start_iterations)): + raise ValueError("PopulationTripsParameters.behavior_change_phases cannot define the same start_iteration twice.") + + return self + + def get_behavior_change_scope(self, iteration: int) -> BehaviorChangeScope: + """Return the active behavior-change scope for a given iteration. + + Args: + iteration: Current simulation iteration (1-based). + + Returns: + The active behavior-change scope. If no phase applies yet, returns + ``BehaviorChangeScope.FULL_REPLANNING``. + """ + if self.behavior_change_phases is None: + return BehaviorChangeScope.FULL_REPLANNING + + active_phase = None + for phase in self.behavior_change_phases: + if phase.start_iteration > iteration: + break + active_phase = phase + + if active_phase is None: + return BehaviorChangeScope.FULL_REPLANNING + + return active_phase.scope diff --git a/mobility/choice_models/state_updater.py b/mobility/choice_models/state_updater.py index fcd30ab..33cb394 100644 --- a/mobility/choice_models/state_updater.py +++ b/mobility/choice_models/state_updater.py @@ -4,6 +4,7 @@ import polars as pl +from mobility.choice_models.population_trips_parameters import BehaviorChangeScope from mobility.choice_models.transition_schema import TRANSITION_EVENT_COLUMNS class StateUpdater: @@ -65,7 +66,8 @@ def get_new_states( iteration, motives, parameters.min_activity_time_constant, - tmp_folders + tmp_folders, + parameters.get_behavior_change_scope(iteration), ) self._assert_current_states_covered_by_possible_steps( current_states, @@ -83,7 +85,11 @@ def get_new_states( parameters.min_activity_time_constant ) - transition_prob = self.get_transition_probabilities(current_states, possible_states_utility) + transition_prob = self.get_transition_probabilities( + current_states, + possible_states_utility, + parameters.get_behavior_change_scope(iteration), + ) current_states, transition_events = self.apply_transitions(current_states, transition_prob, iteration) transition_events = self.add_transition_state_details(transition_events, possible_states_steps) current_states_steps = self.get_current_states_steps(current_states, possible_states_steps) @@ -144,7 +150,8 @@ def get_possible_states_steps( iteration, motives, min_activity_time_constant, - tmp_folders + tmp_folders, + behavior_change_scope: BehaviorChangeScope, ): """Enumerate candidate state steps and compute per-step utilities. @@ -160,9 +167,13 @@ def get_possible_states_steps( sinks (pl.DataFrame): Sink state per (motive,to). motive_dur (pl.DataFrame): Mean durations per (csp,motive). iteration (int): Current iteration to pick latest artifacts. - activity_utility_coeff (float): Coefficient for activity utility. + motives (list[Motive]): Available motives for the simulation. + min_activity_time_constant (float): Controls the minimum activity + time used in utility calculations. tmp_folders (dict[str, pathlib.Path]): Must contain "spatialized-chains" and "modes". - + behavior_change_scope (BehaviorChangeScope): Highest level of + behavior change reachable during the current iteration. + Returns: pl.DataFrame: Candidate per-step rows with columns including ["demand_group_id","csp","motive_seq_id","dest_seq_id","mode_seq_id", @@ -242,8 +253,62 @@ def get_possible_states_steps( ) ) + + possible_states_steps = self.filter_reachable_possible_states_steps( + possible_states_steps=possible_states_steps, + current_states=current_states, + behavior_change_scope=behavior_change_scope, + ) return possible_states_steps + + def filter_reachable_possible_states_steps( + self, + possible_states_steps: pl.LazyFrame, + current_states: pl.DataFrame, + behavior_change_scope: BehaviorChangeScope, + ) -> pl.LazyFrame: + """Restrict reachable candidate states to the active behavior scope. + + Args: + possible_states_steps: Candidate state-step rows before reachability + filtering. + current_states: Aggregate states occupied before the iteration + update. + behavior_change_scope: Highest level of behavior change reachable + during the iteration. + + Returns: + Reachable candidate state-step rows for the current scope. + """ + active_states = ( + current_states + .filter(pl.col("motive_seq_id") != 0) + .select(["demand_group_id", "motive_seq_id", "dest_seq_id"]) + .unique() + ) + + if behavior_change_scope == BehaviorChangeScope.FULL_REPLANNING: + return possible_states_steps + + if active_states.height == 0: + return possible_states_steps.filter(pl.lit(False)) + + if behavior_change_scope == BehaviorChangeScope.DESTINATION_REPLANNING: + return possible_states_steps.join( + active_states.lazy().select(["demand_group_id", "motive_seq_id"]).unique(), + on=["demand_group_id", "motive_seq_id"], + how="inner", + ) + + if behavior_change_scope == BehaviorChangeScope.MODE_REPLANNING: + return possible_states_steps.join( + active_states.lazy(), + on=["demand_group_id", "motive_seq_id", "dest_seq_id"], + how="inner", + ) + + raise ValueError(f"Unsupported behavior change scope: {behavior_change_scope}") def get_possible_states_utility( @@ -323,6 +388,7 @@ def get_transition_probabilities( self, current_states: pl.DataFrame, possible_states_utility: pl.LazyFrame, + behavior_change_scope: BehaviorChangeScope, transition_cost: float = 0.0 ) -> pl.DataFrame: """Compute transition probabilities from current to candidate states. @@ -333,6 +399,11 @@ def get_transition_probabilities( Args: current_states (pl.DataFrame): Current states with utilities. possible_states_utility (pl.DataFrame): Candidate states with utilities. + behavior_change_scope (BehaviorChangeScope): Highest level of + behavior change reachable during the iteration. Restricted + scopes freeze stay-home and require the destination state to + share the appropriate motive/destination keys with the origin + state. Returns: pl.DataFrame: Transitions with @@ -343,21 +414,38 @@ def get_transition_probabilities( state_cols = ["demand_group_id", "motive_seq_id", "dest_seq_id", "mode_seq_id"] + current_states_for_transitions = current_states.lazy() + possible_states_utility_for_transitions = possible_states_utility + + if behavior_change_scope != BehaviorChangeScope.FULL_REPLANNING: + current_states_for_transitions = current_states_for_transitions.filter(pl.col("mode_seq_id") != 0) + possible_states_utility_for_transitions = possible_states_utility_for_transitions.filter(pl.col("mode_seq_id") != 0) + + scope_pair_constraint = pl.lit(True) + if behavior_change_scope == BehaviorChangeScope.DESTINATION_REPLANNING: + scope_pair_constraint = pl.col("motive_seq_id") == pl.col("motive_seq_id_trans") + elif behavior_change_scope == BehaviorChangeScope.MODE_REPLANNING: + scope_pair_constraint = ( + (pl.col("motive_seq_id") == pl.col("motive_seq_id_trans")) + & (pl.col("dest_seq_id") == pl.col("dest_seq_id_trans")) + ) + transition_probabilities = ( - current_states.lazy() + current_states_for_transitions .select(state_cols + ["utility"]) .rename({"utility": "utility_prev_from"}) - + # Join the updated utility of the current states - .join(possible_states_utility, on=state_cols) - + .join(possible_states_utility_for_transitions, on=state_cols) + # Join the possible states when they can improve the utility compared to the current states # (join also the current state so it is included in the probability calculation) .join_where( - possible_states_utility, + possible_states_utility_for_transitions, ( (pl.col("demand_group_id") == pl.col("demand_group_id_trans")) & + scope_pair_constraint & ( (pl.col("utility_trans") > pl.col("utility") - 5.0) | ( @@ -591,7 +679,7 @@ def add_transition_state_details( activity_time=pl.col("duration_per_pers").fill_null(0.0).sum(), travel_time=pl.col("time").fill_null(0.0).sum(), distance=pl.col("distance").fill_null(0.0).sum(), - steps=pl.col("step_desc").sort_by("seq_step_index").str.concat("
"), + steps=pl.col("step_desc").sort_by("seq_step_index").str.join("
"), ) .collect(engine="streaming") ) diff --git a/tests/back/integration/test_011_population_trips_behavior_change_phases_can_be_computed.py b/tests/back/integration/test_011_population_trips_behavior_change_phases_can_be_computed.py new file mode 100644 index 0000000..a3d92d2 --- /dev/null +++ b/tests/back/integration/test_011_population_trips_behavior_change_phases_can_be_computed.py @@ -0,0 +1,125 @@ +import polars as pl +import pytest + +import mobility +from mobility.choice_models.population_trips import PopulationTrips +from mobility.choice_models.population_trips_parameters import ( + BehaviorChangePhase, + BehaviorChangeScope, + PopulationTripsParameters, +) +from mobility.motives import HomeMotive, OtherMotive, WorkMotive +from mobility.parsers.mobility_survey.france import EMPMobilitySurvey + +@pytest.mark.dependency( + depends=[ + "tests/back/integration/test_008_population_trips_can_be_computed.py::test_008_population_trips_can_be_computed" + ], + scope="session", +) +def test_011_population_trips_behavior_change_phases_can_be_computed(test_data): + transport_zones = mobility.TransportZones( + local_admin_unit_id=test_data["transport_zones_local_admin_unit_id"], + radius=test_data["transport_zones_radius"], + ) + emp = EMPMobilitySurvey() + + pop = mobility.Population( + transport_zones, + sample_size=test_data["population_sample_size"], + ) + + pop_trips = PopulationTrips( + population=pop, + modes=[mobility.CarMode(transport_zones), mobility.WalkMode(transport_zones)], + motives=[HomeMotive(), WorkMotive(), OtherMotive(population=pop)], + surveys=[emp], + parameters=PopulationTripsParameters( + n_iterations=3, + n_iter_per_cost_update=0, + alpha=0.01, + dest_prob_cutoff=0.9, + k_mode_sequences=3, + cost_uncertainty_sd=1.0, + mode_sequence_search_parallel=False, + seed=0, + behavior_change_phases=[ + BehaviorChangePhase(start_iteration=1, scope=BehaviorChangeScope.FULL_REPLANNING), + BehaviorChangePhase(start_iteration=2, scope=BehaviorChangeScope.MODE_REPLANNING), + BehaviorChangePhase(start_iteration=3, scope=BehaviorChangeScope.DESTINATION_REPLANNING), + ], + ), + ) + + pop_trips.remove() + result = pop_trips.get() + weekday_flows = result["weekday_flows"].collect() + weekday_transitions = result["weekday_transitions"].collect() + cache_path = pop_trips.cache_path["weekday_flows"] + inputs_hash = str(cache_path.stem).split("-")[0] + spatialized_chains_dir = cache_path.parent / f"{inputs_hash}-spatialized-chains" + spatialized_chains_1 = pl.read_parquet(spatialized_chains_dir / "spatialized_chains_1.parquet") + spatialized_chains_2 = pl.read_parquet(spatialized_chains_dir / "spatialized_chains_2.parquet") + spatialized_chains_3 = pl.read_parquet(spatialized_chains_dir / "spatialized_chains_3.parquet") + + assert weekday_flows.height > 0 + assert weekday_transitions.height > 0 + assert weekday_transitions["iteration"].unique().sort().to_list() == [1, 2, 3] + assert spatialized_chains_1.height > 0 + assert spatialized_chains_2.height > 0 + assert spatialized_chains_3.height > 0 + + bad_mode = weekday_transitions.filter( + (pl.col("iteration") == 2) + & ( + (pl.col("motive_seq_id") != pl.col("motive_seq_id_trans")) + | (pl.col("dest_seq_id") != pl.col("dest_seq_id_trans")) + ) + ) + bad_destination = weekday_transitions.filter( + (pl.col("iteration") == 3) + & (pl.col("motive_seq_id") != pl.col("motive_seq_id_trans")) + ) + + mode_replanning_dest_keys = ( + spatialized_chains_2 + .select(["demand_group_id", "motive_seq_id", "dest_seq_id"]) + .unique() + ) + initial_dest_keys = ( + spatialized_chains_1 + .select(["demand_group_id", "motive_seq_id", "dest_seq_id"]) + .unique() + ) + destination_replanning_motive_keys = ( + spatialized_chains_3 + .select(["demand_group_id", "motive_seq_id"]) + .unique() + ) + mode_replanning_motive_keys = ( + spatialized_chains_2 + .select(["demand_group_id", "motive_seq_id"]) + .unique() + ) + + new_dest_keys_in_mode_replanning = ( + mode_replanning_dest_keys + .join( + initial_dest_keys, + on=["demand_group_id", "motive_seq_id", "dest_seq_id"], + how="anti", + ) + ) + new_motive_keys_in_destination_replanning = ( + destination_replanning_motive_keys + .join( + mode_replanning_motive_keys, + on=["demand_group_id", "motive_seq_id"], + how="anti", + ) + ) + + assert new_dest_keys_in_mode_replanning.height == 0 + assert new_motive_keys_in_destination_replanning.height == 0 + assert bad_mode.height == 0 + assert bad_destination.height == 0 diff --git a/tests/back/unit/choice_models/test_001_transition_scopes.py b/tests/back/unit/choice_models/test_001_transition_scopes.py new file mode 100644 index 0000000..035bdcf --- /dev/null +++ b/tests/back/unit/choice_models/test_001_transition_scopes.py @@ -0,0 +1,531 @@ +import shutil +import uuid +from pathlib import Path + +import polars as pl + +from mobility.choice_models.population_trips_candidates import ( + get_active_destination_sequences, + get_active_motive_chains, + get_spatialized_chains, +) +from mobility.choice_models.population_trips_parameters import ( + BehaviorChangePhase, + BehaviorChangeScope, + PopulationTripsParameters, +) +from mobility.choice_models.state_updater import StateUpdater + + +def test_population_trips_parameters_returns_default_behavior_change_scope(): + parameters = PopulationTripsParameters() + + assert parameters.get_behavior_change_scope(1) == BehaviorChangeScope.FULL_REPLANNING + + +def test_population_trips_parameters_resolves_active_behavior_change_scope(): + parameters = PopulationTripsParameters( + behavior_change_phases=[ + BehaviorChangePhase(start_iteration=3, scope=BehaviorChangeScope.MODE_REPLANNING), + BehaviorChangePhase(start_iteration=5, scope=BehaviorChangeScope.DESTINATION_REPLANNING), + ] + ) + + assert parameters.get_behavior_change_scope(1) == BehaviorChangeScope.FULL_REPLANNING + assert parameters.get_behavior_change_scope(3) == BehaviorChangeScope.MODE_REPLANNING + assert parameters.get_behavior_change_scope(6) == BehaviorChangeScope.DESTINATION_REPLANNING + + +def test_get_active_motive_chains_keeps_only_non_stay_home_sequences(): + chains_by_motive = pl.DataFrame( + { + "demand_group_id": [1, 1, 2], + "motive_seq_id": [10, 20, 30], + "seq_step_index": [0, 0, 0], + "motive": ["work", "other", "work"], + }, + schema={"demand_group_id": pl.UInt32, "motive_seq_id": pl.UInt32, "seq_step_index": pl.UInt32, "motive": pl.Utf8}, + ) + current_states = pl.DataFrame( + { + "demand_group_id": [1, 1, 2], + "motive_seq_id": [10, 0, 0], + "dest_seq_id": [100, 0, 0], + "mode_seq_id": [1000, 0, 0], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "mode_seq_id": pl.UInt32, + }, + ) + + result = get_active_motive_chains(chains_by_motive, current_states) + + assert result.select(["demand_group_id", "motive_seq_id"]).to_dicts() == [ + {"demand_group_id": 1, "motive_seq_id": 10} + ] + + +def test_get_active_motive_chains_deduplicates_active_motive_keys(): + chains_by_motive = pl.DataFrame( + { + "demand_group_id": [1, 1], + "motive_seq_id": [10, 10], + "seq_step_index": [0, 1], + "motive": ["work", "home"], + }, + schema={"demand_group_id": pl.UInt32, "motive_seq_id": pl.UInt32, "seq_step_index": pl.UInt32, "motive": pl.Utf8}, + ) + current_states = pl.DataFrame( + { + "demand_group_id": [1, 1], + "motive_seq_id": [10, 10], + "dest_seq_id": [100, 101], + "mode_seq_id": [1000, 1001], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "mode_seq_id": pl.UInt32, + }, + ) + + result = get_active_motive_chains(chains_by_motive, current_states) + + assert result.height == 2 + assert result["seq_step_index"].to_list() == [0, 1] + + +def test_get_active_destination_sequences_uses_latest_matching_chain(): + spatialized_dir = Path("tests/back/unit/choice_models/.tmp") / str(uuid.uuid4()) / "spatialized-chains" + spatialized_dir.mkdir(parents=True, exist_ok=True) + + try: + pl.DataFrame( + { + "demand_group_id": [1], + "motive_seq_id": [10], + "dest_seq_id": [100], + "seq_step_index": [0], + "from": [11], + "to": [12], + "iteration": [1], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "seq_step_index": pl.UInt32, + "from": pl.Int32, + "to": pl.Int32, + "iteration": pl.UInt32, + }, + ).write_parquet(spatialized_dir / "spatialized_chains_1.parquet") + pl.DataFrame( + { + "demand_group_id": [1], + "motive_seq_id": [10], + "dest_seq_id": [100], + "seq_step_index": [0], + "from": [21], + "to": [22], + "iteration": [2], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "seq_step_index": pl.UInt32, + "from": pl.Int32, + "to": pl.Int32, + "iteration": pl.UInt32, + }, + ).write_parquet(spatialized_dir / "spatialized_chains_2.parquet") + + current_states = pl.DataFrame( + { + "demand_group_id": [1], + "motive_seq_id": [10], + "dest_seq_id": [100], + "mode_seq_id": [1000], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "mode_seq_id": pl.UInt32, + }, + ) + + result = get_active_destination_sequences( + current_states=current_states, + iteration=4, + tmp_folders={"spatialized-chains": spatialized_dir}, + ) + + assert result.to_dicts() == [ + { + "demand_group_id": 1, + "motive_seq_id": 10, + "dest_seq_id": 100, + "seq_step_index": 0, + "from": 21, + "to": 22, + "iteration": 4, + } + ] + finally: + shutil.rmtree(spatialized_dir.parent, ignore_errors=True) + + +def test_get_active_destination_sequences_deduplicates_active_destination_keys(): + spatialized_dir = Path("tests/back/unit/choice_models/.tmp") / str(uuid.uuid4()) / "spatialized-chains" + spatialized_dir.mkdir(parents=True, exist_ok=True) + + try: + pl.DataFrame( + { + "demand_group_id": [1, 1], + "motive_seq_id": [10, 10], + "dest_seq_id": [100, 100], + "seq_step_index": [0, 1], + "from": [21, 22], + "to": [22, 23], + "iteration": [2, 2], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "seq_step_index": pl.UInt32, + "from": pl.Int32, + "to": pl.Int32, + "iteration": pl.UInt32, + }, + ).write_parquet(spatialized_dir / "spatialized_chains_2.parquet") + + current_states = pl.DataFrame( + { + "demand_group_id": [1, 1], + "motive_seq_id": [10, 10], + "dest_seq_id": [100, 100], + "mode_seq_id": [1000, 1001], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "mode_seq_id": pl.UInt32, + }, + ) + + result = get_active_destination_sequences( + current_states=current_states, + iteration=4, + tmp_folders={"spatialized-chains": spatialized_dir}, + ) + + assert result.height == 2 + assert result["seq_step_index"].sort().to_list() == [0, 1] + finally: + shutil.rmtree(spatialized_dir.parent, ignore_errors=True) + + +def test_get_spatialized_chains_limits_destination_resampling_to_active_motives(): + class DummySampler: + def __init__(self): + self.seen_chains = None + + def run(self, motives, transport_zones, remaining_sinks, iteration, chains, demand_groups, costs, tmp_folders, parameters, seed): + self.seen_chains = chains + return pl.DataFrame( + { + "demand_group_id": [1], + "motive_seq_id": [10], + "dest_seq_id": [100], + "seq_step_index": [0], + "from": [1], + "to": [2], + "iteration": [iteration], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "seq_step_index": pl.UInt32, + "from": pl.Int32, + "to": pl.Int32, + "iteration": pl.UInt32, + }, + ) + + destination_sequence_sampler = DummySampler() + + chains_by_motive = pl.DataFrame( + { + "demand_group_id": [1, 1], + "motive_seq_id": [10, 20], + "seq_step_index": [0, 0], + "motive": ["work", "other"], + }, + schema={"demand_group_id": pl.UInt32, "motive_seq_id": pl.UInt32, "seq_step_index": pl.UInt32, "motive": pl.Utf8}, + ) + current_states = pl.DataFrame( + { + "demand_group_id": [1], + "motive_seq_id": [10], + "dest_seq_id": [100], + "mode_seq_id": [1000], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "mode_seq_id": pl.UInt32, + }, + ) + + get_spatialized_chains( + behavior_change_scope=BehaviorChangeScope.DESTINATION_REPLANNING, + current_states=current_states, + destination_sequence_sampler=destination_sequence_sampler, + motives=[], + transport_zones=None, + remaining_sinks=pl.DataFrame(), + iteration=3, + chains_by_motive=chains_by_motive, + demand_groups=pl.DataFrame(), + costs=pl.DataFrame(), + tmp_folders={"spatialized-chains": None}, + parameters=PopulationTripsParameters(), + seed=123, + ) + + assert destination_sequence_sampler.seen_chains.select("motive_seq_id").to_series().to_list() == [10] + + +def test_filter_reachable_possible_states_steps_limits_mode_replanning_to_active_destinations(): + updater = StateUpdater() + possible_states_steps = pl.DataFrame( + { + "demand_group_id": [1, 1, 1], + "motive_seq_id": [10, 10, 11], + "dest_seq_id": [100, 101, 100], + "mode_seq_id": [1000, 1001, 1002], + "seq_step_index": [0, 0, 0], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "mode_seq_id": pl.UInt32, + "seq_step_index": pl.UInt32, + }, + ).lazy() + current_states = pl.DataFrame( + { + "demand_group_id": [1], + "motive_seq_id": [10], + "dest_seq_id": [100], + "mode_seq_id": [999], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "mode_seq_id": pl.UInt32, + }, + ) + + result = updater.filter_reachable_possible_states_steps( + possible_states_steps=possible_states_steps, + current_states=current_states, + behavior_change_scope=BehaviorChangeScope.MODE_REPLANNING, + ).collect() + + assert result.select(["motive_seq_id", "dest_seq_id"]).unique().to_dicts() == [ + {"motive_seq_id": 10, "dest_seq_id": 100} + ] + + +def test_filter_reachable_possible_states_steps_limits_destination_replanning_to_active_motives(): + updater = StateUpdater() + possible_states_steps = pl.DataFrame( + { + "demand_group_id": [1, 1, 1], + "motive_seq_id": [10, 10, 11], + "dest_seq_id": [100, 101, 100], + "mode_seq_id": [1000, 1001, 1002], + "seq_step_index": [0, 0, 0], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "mode_seq_id": pl.UInt32, + "seq_step_index": pl.UInt32, + }, + ).lazy() + current_states = pl.DataFrame( + { + "demand_group_id": [1], + "motive_seq_id": [10], + "dest_seq_id": [100], + "mode_seq_id": [999], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "mode_seq_id": pl.UInt32, + }, + ) + + result = updater.filter_reachable_possible_states_steps( + possible_states_steps=possible_states_steps, + current_states=current_states, + behavior_change_scope=BehaviorChangeScope.DESTINATION_REPLANNING, + ).collect() + + assert result.select("motive_seq_id").unique().to_series().to_list() == [10] + + +def test_filter_reachable_possible_states_steps_keeps_only_stay_home_when_no_active_states(): + updater = StateUpdater() + possible_states_steps = pl.DataFrame( + { + "demand_group_id": [1], + "motive_seq_id": [10], + "dest_seq_id": [100], + "mode_seq_id": [1000], + "seq_step_index": [0], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "mode_seq_id": pl.UInt32, + "seq_step_index": pl.UInt32, + }, + ).lazy() + current_states = pl.DataFrame( + { + "demand_group_id": [1], + "motive_seq_id": [0], + "dest_seq_id": [0], + "mode_seq_id": [0], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "mode_seq_id": pl.UInt32, + }, + ) + + result = updater.filter_reachable_possible_states_steps( + possible_states_steps=possible_states_steps, + current_states=current_states, + behavior_change_scope=BehaviorChangeScope.MODE_REPLANNING, + ).collect() + + assert result.height == 0 + + +def test_get_transition_probabilities_blocks_stay_home_in_mode_replanning(): + updater = StateUpdater() + current_states = pl.DataFrame( + { + "demand_group_id": [1, 1], + "motive_seq_id": [0, 10], + "dest_seq_id": [0, 100], + "mode_seq_id": [0, 1000], + "utility": [0.0, 1.0], + "n_persons": [5.0, 5.0], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "mode_seq_id": pl.UInt32, + "utility": pl.Float64, + "n_persons": pl.Float64, + }, + ) + possible_states_utility = pl.DataFrame( + { + "demand_group_id": [1, 1, 1, 1], + "motive_seq_id": [0, 10, 10, 11], + "dest_seq_id": [0, 100, 101, 100], + "mode_seq_id": [0, 1001, 1002, 1003], + "utility": [10.0, 2.0, 3.0, 4.0], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "mode_seq_id": pl.UInt32, + "utility": pl.Float64, + }, + ).lazy() + + result = updater.get_transition_probabilities( + current_states=current_states, + possible_states_utility=possible_states_utility, + behavior_change_scope=BehaviorChangeScope.MODE_REPLANNING, + ) + + assert result.filter(pl.col("motive_seq_id") != 10).height == 0 + assert result.filter(pl.col("dest_seq_id") != 100).height == 0 + assert result.filter(pl.col("motive_seq_id_trans") != 10).height == 0 + assert result.filter(pl.col("dest_seq_id_trans") != 100).height == 0 + assert result.filter(pl.col("motive_seq_id_trans") == 0).height == 0 + + +def test_get_transition_probabilities_limits_destination_replanning_to_same_motive(): + updater = StateUpdater() + current_states = pl.DataFrame( + { + "demand_group_id": [1], + "motive_seq_id": [10], + "dest_seq_id": [100], + "mode_seq_id": [1000], + "utility": [1.0], + "n_persons": [5.0], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "mode_seq_id": pl.UInt32, + "utility": pl.Float64, + "n_persons": pl.Float64, + }, + ) + possible_states_utility = pl.DataFrame( + { + "demand_group_id": [1, 1, 1], + "motive_seq_id": [10, 10, 11], + "dest_seq_id": [100, 101, 100], + "mode_seq_id": [1000, 1001, 1002], + "utility": [1.0, 2.0, 3.0], + }, + schema={ + "demand_group_id": pl.UInt32, + "motive_seq_id": pl.UInt32, + "dest_seq_id": pl.UInt32, + "mode_seq_id": pl.UInt32, + "utility": pl.Float64, + }, + ).lazy() + + result = updater.get_transition_probabilities( + current_states=current_states, + possible_states_utility=possible_states_utility, + behavior_change_scope=BehaviorChangeScope.DESTINATION_REPLANNING, + ) + + assert result.select("motive_seq_id_trans").unique().to_series().to_list() == [10]