Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mobility/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@


from mobility.choice_models.population_trips import PopulationTrips
from mobility.choice_models.population_trips_parameters import PopulationTripsParameters
from mobility.choice_models.population_trips_parameters import PopulationTripsParameters, BehaviorChangePhase, BehaviorChangeScope


from mobility.transport_graphs.speed_modifier import (
Expand Down
63 changes: 37 additions & 26 deletions mobility/choice_models/population_trips.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@
from mobility.file_asset import FileAsset
from mobility.population import Population
from mobility.choice_models.travel_costs_aggregator import TravelCostsAggregator
from mobility.choice_models.population_trips_parameters import PopulationTripsParameters
from mobility.choice_models.population_trips_parameters import (
PopulationTripsParameters,
)
from mobility.choice_models.population_trips_candidates import (
get_mode_sequences,
get_spatialized_chains,
)
from mobility.choice_models.destination_sequence_sampler import DestinationSequenceSampler
from mobility.choice_models.top_k_mode_sequence_search import TopKModeSequenceSearch
from mobility.choice_models.state_initializer import StateInitializer
Expand Down Expand Up @@ -375,32 +381,36 @@ def run_model(self, is_weekday: bool) -> Tuple[pl.DataFrame, pl.DataFrame, pl.Da

seed = self.rng.getrandbits(64)

(
self.destination_sequence_sampler.run(
motives,
population.transport_zones,
remaining_sinks,
iteration,
chains_by_motive,
demand_groups,
costs,
tmp_folders,
parameters,
seed
)
.write_parquet(tmp_folders["spatialized-chains"] / f"spatialized_chains_{iteration}.parquet")
behavior_change_scope = parameters.get_behavior_change_scope(iteration)

spatialized_chains = get_spatialized_chains(
behavior_change_scope=behavior_change_scope,
current_states=current_states,
destination_sequence_sampler=self.destination_sequence_sampler,
motives=motives,
transport_zones=population.transport_zones,
remaining_sinks=remaining_sinks,
iteration=iteration,
chains_by_motive=chains_by_motive,
demand_groups=demand_groups,
costs=costs,
tmp_folders=tmp_folders,
parameters=parameters,
seed=seed,
)


(
self.top_k_mode_sequence_search.run(
iteration,
costs_aggregator,
tmp_folders,
parameters
)
.write_parquet(tmp_folders["modes"] / f"mode_sequences_{iteration}.parquet")
spatialized_chains.write_parquet(
tmp_folders["spatialized-chains"] / f"spatialized_chains_{iteration}.parquet"
)

mode_sequences = get_mode_sequences(
spatialized_chains=spatialized_chains,
top_k_mode_sequence_search=self.top_k_mode_sequence_search,
iteration=iteration,
costs_aggregator=costs_aggregator,
tmp_folders=tmp_folders,
parameters=parameters,
)
mode_sequences.write_parquet(tmp_folders["modes"] / f"mode_sequences_{iteration}.parquet")

current_states, current_states_steps, transition_events = self.state_updater.get_new_states(
current_states,
Expand Down Expand Up @@ -459,7 +469,8 @@ def run_model(self, is_weekday: bool) -> Tuple[pl.DataFrame, pl.DataFrame, pl.Da
parameters.n_iterations,
motives,
parameters.min_activity_time_constant,
tmp_folders
tmp_folders,
parameters.get_behavior_change_scope(parameters.n_iterations),
)
current_states_steps = self.state_updater.get_current_states_steps(current_states, possible_states_steps)

Expand Down
283 changes: 283 additions & 0 deletions mobility/choice_models/population_trips_candidates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
import pathlib

import polars as pl

from mobility.choice_models.population_trips_parameters import (
BehaviorChangeScope,
PopulationTripsParameters,
)


def get_spatialized_chains(
behavior_change_scope: BehaviorChangeScope,
current_states: pl.DataFrame,
destination_sequence_sampler,
motives,
transport_zones,
remaining_sinks: pl.DataFrame,
iteration: int,
chains_by_motive: pl.DataFrame,
demand_groups: pl.DataFrame,
costs: pl.DataFrame,
tmp_folders: dict[str, pathlib.Path],
parameters: PopulationTripsParameters,
seed: int,
) -> pl.DataFrame:
"""Get spatialized chains for the current simulation step.

Args:
behavior_change_scope: Active behavior-change scope for the step.
current_states: Aggregate states occupied before the step update.
destination_sequence_sampler: Sampler used when destination resampling
is allowed.
motives: Available motives for the simulation.
transport_zones: Transport zones used to spatialize destinations.
remaining_sinks: Remaining destination capacities.
iteration: Current simulation iteration number.
chains_by_motive: Full chain templates indexed by motive sequence.
demand_groups: Demand-group metadata used during spatialization.
costs: Current OD costs used by destination sampling.
tmp_folders: Temporary folders for intermediate iteration artifacts.
parameters: PopulationTrips parameters.
seed: RNG seed used for destination sampling.

Returns:
Spatialized chains to use for the current step.
"""
if behavior_change_scope == BehaviorChangeScope.FULL_REPLANNING:
chains_to_sample = chains_by_motive
elif behavior_change_scope == BehaviorChangeScope.DESTINATION_REPLANNING:
chains_to_sample = get_active_motive_chains(
chains_by_motive=chains_by_motive,
current_states=current_states,
)
elif behavior_change_scope == BehaviorChangeScope.MODE_REPLANNING:
return get_active_destination_sequences(
current_states=current_states,
iteration=iteration,
tmp_folders=tmp_folders,
)
else:
raise ValueError(f"Unsupported behavior change scope: {behavior_change_scope}")

if chains_to_sample.height == 0:
if get_active_non_stay_home_states(current_states).height > 0:
raise ValueError(
"No chains available for active non-stay-home states at "
f"iteration={iteration} with behavior_change_scope={behavior_change_scope.value}."
)
return empty_spatialized_chains()

return destination_sequence_sampler.run(
motives,
transport_zones,
remaining_sinks,
iteration,
chains_to_sample,
demand_groups,
costs,
tmp_folders,
parameters,
seed,
)


def get_mode_sequences(
spatialized_chains: pl.DataFrame,
top_k_mode_sequence_search,
iteration: int,
costs_aggregator,
tmp_folders: dict[str, pathlib.Path],
parameters: PopulationTripsParameters,
) -> pl.DataFrame:
"""Get mode sequences for the current simulation step.

Args:
spatialized_chains: Spatialized chains selected for the current step.
top_k_mode_sequence_search: Searcher that computes top-k mode
sequences for each spatialized chain.
iteration: Current simulation iteration number.
costs_aggregator: Provides OD costs by transport mode.
tmp_folders: Temporary folders for intermediate iteration artifacts.
parameters: PopulationTrips parameters.

Returns:
Mode sequences to use for the current step.
"""
if spatialized_chains.height == 0:
return empty_mode_sequences()

return top_k_mode_sequence_search.run(
iteration,
costs_aggregator,
tmp_folders,
parameters,
)


def get_active_motive_chains(
chains_by_motive: pl.DataFrame,
current_states: pl.DataFrame,
) -> pl.DataFrame:
"""Keep chain templates for motive sequences currently selected.

Args:
chains_by_motive: Full chain-template table.
current_states: Aggregate states occupied before the step update.

Returns:
Chain templates restricted to active non-stay-home motive sequences.
"""
active_motive_sequences = get_active_non_stay_home_states(current_states).select(
["demand_group_id", "motive_seq_id"]
).unique()

if active_motive_sequences.height == 0:
return chains_by_motive.head(0)

active_motive_sequences = active_motive_sequences.with_columns(
demand_group_id=pl.col("demand_group_id").cast(chains_by_motive.schema["demand_group_id"]),
motive_seq_id=pl.col("motive_seq_id").cast(chains_by_motive.schema["motive_seq_id"]),
)

return chains_by_motive.join(
active_motive_sequences,
on=["demand_group_id", "motive_seq_id"],
how="inner",
)


def get_active_destination_sequences(
current_states: pl.DataFrame,
iteration: int,
tmp_folders: dict[str, pathlib.Path],
) -> pl.DataFrame:
"""Reuse active destination sequences and tag them for a new iteration.

Args:
current_states: Aggregate states occupied before the step update.
iteration: Current simulation iteration number.
tmp_folders: Temporary folders containing prior spatialized chains.

Returns:
Spatialized chains matching the active non-stay-home destination
sequences, restamped with the current iteration.
"""
active_dest_sequences = get_active_non_stay_home_states(current_states).select(
["demand_group_id", "motive_seq_id", "dest_seq_id"]
).unique()

if active_dest_sequences.height == 0:
return empty_spatialized_chains()

available_chains = get_latest_spatialized_chains(tmp_folders)
if available_chains is None:
raise ValueError(
"No prior spatialized chains available for active non-stay-home "
f"states at iteration={iteration}."
)

active_dest_sequences = active_dest_sequences.with_columns(
demand_group_id=pl.col("demand_group_id").cast(pl.UInt32),
motive_seq_id=pl.col("motive_seq_id").cast(pl.UInt32),
dest_seq_id=pl.col("dest_seq_id").cast(pl.UInt32),
)

reused = (
available_chains
.join(
active_dest_sequences.lazy(),
on=["demand_group_id", "motive_seq_id", "dest_seq_id"],
how="inner",
)
.with_columns(iteration=pl.lit(iteration).cast(pl.UInt32()))
.collect(engine="streaming")
)

if reused.height == 0:
raise ValueError(
"Active non-stay-home states could not be matched to reusable "
f"destination chains at iteration={iteration}."
)

return reused


def get_active_non_stay_home_states(current_states: pl.DataFrame) -> pl.DataFrame:
"""Get active non-stay-home states from the current aggregate state table.

Args:
current_states: Aggregate states occupied before the step update.

Returns:
Distinct active non-stay-home state keys.
"""
return (
current_states
.filter(pl.col("motive_seq_id") != 0)
.select(["demand_group_id", "motive_seq_id", "dest_seq_id", "mode_seq_id"])
.unique()
)


def get_latest_spatialized_chains(tmp_folders: dict[str, pathlib.Path]) -> pl.LazyFrame | None:
"""Load the latest available spatialized chains across prior iterations.

Args:
tmp_folders: Temporary folders containing spatialized-chain parquet
files.

Returns:
A lazy frame containing the most recent row for each state-step key, or
``None`` if no spatialized chains have been saved yet.
"""
if not any(tmp_folders["spatialized-chains"].glob("spatialized_chains_*.parquet")):
return None

pattern = tmp_folders["spatialized-chains"] / "spatialized_chains_*.parquet"
return (
pl.scan_parquet(str(pattern))
.sort("iteration", descending=True)
.unique(
subset=["demand_group_id", "motive_seq_id", "dest_seq_id", "seq_step_index"],
keep="first",
)
)


def empty_spatialized_chains() -> pl.DataFrame:
"""Create an empty spatialized-chains table with the expected schema.

Returns:
Empty spatialized chains.
"""
return pl.DataFrame(
schema={
"demand_group_id": pl.UInt32,
"motive_seq_id": pl.UInt32,
"dest_seq_id": pl.UInt32,
"seq_step_index": pl.UInt32,
"from": pl.Int32,
"to": pl.Int32,
"iteration": pl.UInt32,
}
)


def empty_mode_sequences() -> pl.DataFrame:
"""Create an empty mode-sequences table with the expected schema.

Returns:
Empty mode sequences.
"""
return pl.DataFrame(
schema={
"demand_group_id": pl.UInt32,
"motive_seq_id": pl.UInt32,
"dest_seq_id": pl.UInt32,
"mode_seq_id": pl.UInt32,
"seq_step_index": pl.UInt32,
"mode": pl.Utf8,
"iteration": pl.UInt32,
}
)
Loading
Loading