Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 51 additions & 47 deletions apps/state/lib/state/alert/hooks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,42 +6,72 @@ defmodule State.Alert.Hooks do

@spec pre_insert_hook(Alert.t()) :: [Alert.t()]
def pre_insert_hook(alert) do
entities = entities_with_parents(alert.informed_entity)
entities = add_computed_entities(alert.informed_entity)

[
%{alert | informed_entity: entities}
]
end

defp entities_with_parents(entities) do
defp add_computed_entities(entities) do
entities
|> Stream.flat_map(&include_entity_parent_stop/1)
|> Stream.flat_map(&include_entity_alternate_trips/1)
|> Enum.group_by(&get_key/1)
|> Stream.flat_map(&merge_entities/1)
|> Stream.concat(get_parent_station_entities(entities))
|> Enum.flat_map(&include_alternate_trip_entities/1)
|> Enum.uniq()
end

defp get_key(%{} = ie), do: Map.take(ie, ~w(route stop trip)a)
@spec get_parent_station_entities([Alert.informed_entity()]) :: [Alert.informed_entity()]
defp get_parent_station_entities(entities) do
entities
|> Stream.map(&get_parent_station_entity/1)
|> Stream.reject(&is_nil/1)
|> Enum.group_by(&get_key/1)
|> Enum.flat_map(&merge_parent_entity_activities/1)
end

defp get_parent_station_entity(%{stop: stop_id} = entity) when is_binary(stop_id) do
case State.Stop.by_id(stop_id) do
%{parent_station: station} when is_binary(station) ->
%{entity | stop: station}

_ ->
nil
end
end

defp merge_entities({_key, entities}) do
activities = merge_activities(entities)
defp get_parent_station_entity(_entity) do
nil
end

defp merge_parent_entity_activities({_key, parent_entities}) do
merged_activities =
parent_entities
|> Enum.map(&MapSet.new(&1[:activities] || []))
|> Enum.reduce(&MapSet.union/2)
|> Enum.sort()

if MapSet.size(activities) == 0 do
entities
if merged_activities == [] do
parent_entities
else
result = MapSet.to_list(activities)
for entity <- entities, do: Map.put(entity, :activities, result)
parent_entities
|> Enum.map(&Map.put(&1, :activities, merged_activities))
|> Enum.uniq()
end
end

defp merge_activities(entities) when is_list(entities) do
Enum.reduce(entities, MapSet.new(), fn ie, acc ->
case Map.get(ie, :activities) do
[_ | _] = activities -> MapSet.union(acc, MapSet.new(activities))
_ -> acc
end
end)
@spec include_alternate_trip_entities(Alert.informed_entity()) :: [Alert.informed_entity()]
defp include_alternate_trip_entities(%{trip: trip_id} = entity) when is_binary(trip_id) do
case all_route_entities(entity) do
[] ->
[entity]

entities ->
entities
end
end

defp include_alternate_trip_entities(entity) do
[entity]
end

defp all_route_entities(%{trip: trip_id} = entity) when is_binary(trip_id) do
Expand Down Expand Up @@ -77,31 +107,5 @@ defmodule State.Alert.Hooks do
end
end

defp include_entity_alternate_trips(%{trip: trip_id} = entity) when is_binary(trip_id) do
case all_route_entities(entity) do
[] ->
[entity]

entities ->
entities
end
end

defp include_entity_alternate_trips(entity) do
[entity]
end

defp include_entity_parent_stop(%{stop: stop_id} = entity) when is_binary(stop_id) do
case State.Stop.by_id(stop_id) do
%{parent_station: station} when is_binary(station) ->
[entity, %{entity | stop: station}]

_ ->
[entity]
end
end

defp include_entity_parent_stop(entity) do
[entity]
end
defp get_key(%{} = ie), do: Map.take(ie, ~w(route stop trip)a)
end
Loading