Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(VehiclesForRouteChannel): Use vehicles from PubSub instead of GenServer #225

Merged
merged 7 commits into from
Oct 25, 2024
1 change: 1 addition & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ config :mobile_app_backend, MobileAppBackend.AppCheck,
jwks_url: "https://firebaseappcheck.googleapis.com/v1/jwks"

config :mobile_app_backend, predictions_broadcast_interval_ms: 5_000
config :mobile_app_backend, vehicles_broadcast_interval_ms: 500

config :mobile_app_backend, MBTAV3API.ResponseCache,
gc_interval: :timer.hours(1),
Expand Down
2 changes: 2 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ config :mobile_app_backend, start_stream_stores?: false

config :mobile_app_backend, start_global_cache?: false

config :mobile_app_backend, start_vehicle_pub_sub?: false

# Print only warnings and errors during test
config :logger, level: :warning

Expand Down
4 changes: 0 additions & 4 deletions lib/mbta_v3_api/stream/static_instance.ex
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,4 @@ defmodule MBTAV3API.Stream.StaticInstance.Impl do
}
]
end

defp args_for_topic("vehicles") do
[type: MBTAV3API.Vehicle, url: "/vehicles", topic: "vehicles"]
end
end
8 changes: 7 additions & 1 deletion lib/mobile_app_backend/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ defmodule MobileAppBackend.Application do
{MobileAppBackend.FinchPoolHealth, pool_name: Finch.CustomPool},
MobileAppBackend.MapboxTokenRotator,
MobileAppBackend.Predictions.Registry,
MobileAppBackend.Predictions.PubSub
MobileAppBackend.Predictions.PubSub,
MobileAppBackend.Vehicles.Registry
] ++
if Application.get_env(:mobile_app_backend, :start_vehicle_pub_sub?, true) do
[MobileAppBackend.Vehicles.PubSub]
else
[]
end ++
if start_global_cache? do
[MobileAppBackend.GlobalDataCache]
else
Expand Down
203 changes: 203 additions & 0 deletions lib/mobile_app_backend/vehicles/pub_sub.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
defmodule MobileAppBackend.Vehicles.PubSub.Behaviour do
alias MBTAV3API.{Route, Vehicle}

@doc """
Subscribe to vehicle updates for the given routes & direction
"""
@callback subscribe_for_routes([Route.id()], 0 | 1) :: [Vehicle.t()]

@doc """
Subscribe to updates for the given vehicle
"""
@callback subscribe(Vehicle.id()) :: Vehicle.t() | nil
end

defmodule MobileAppBackend.Vehicles.PubSub do
@moduledoc """
Allows channels to subscribe to the subset of vehicles they are interested
in and receive updates as the vehicles data changes.

For each subset of vehicles that channels are actively subscribed to, this broadcasts
the latest state of the world (if it has changed) to the registered consumer in two circumstances
1. Regularly scheduled interval - configured by `:vehicles_broadcast_interval_ms`
2. When there is a reset event of the underlying vehicle stream.
"""
use GenServer
alias MBTAV3API.{Store, Stream}
alias MobileAppBackend.Vehicles.PubSub

@behaviour PubSub.Behaviour

require Logger

@fetch_registry_key :fetch_registry_key

@typedoc """
tuple {fetch_keys, format_fn} where format_fn transforms the data returned
into the format expected by subscribers.
"""
@type registry_value :: {Store.fetch_keys(), function()}

@type state :: %{last_dispatched_table_name: atom()}

@spec start_link(Keyword.t()) :: GenServer.on_start()
@spec start_link() :: :ignore | {:error, any()} | {:ok, pid()}
def start_link(opts \\ []) do
name = Keyword.get(opts, :name, __MODULE__)

GenServer.start_link(
__MODULE__,
opts,
name: name
)
end

@impl true
def subscribe_for_routes(route_ids, direction_id) do
route_fetch_key_pairs = Enum.map(route_ids, &[route_id: &1, direction_id: direction_id])

Registry.register(
MobileAppBackend.Vehicles.Registry,
@fetch_registry_key,
{route_fetch_key_pairs, fn data -> data end}
)

Store.Vehicles.fetch(route_fetch_key_pairs)
end

@impl true
def subscribe(vehicle_id) do
fetch_keys = [id: vehicle_id]

format_fn = fn data ->
case data do
[vehicle | _shouldnt_be_rest] -> vehicle
_ -> nil
end
end

Registry.register(
MobileAppBackend.Vehicles.Registry,
@fetch_registry_key,
{fetch_keys, format_fn}
)

fetch_keys
|> Store.Vehicles.fetch()
|> format_fn.()
end

@impl GenServer
def init(opts \\ []) do
Stream.StaticInstance.subscribe("vehicles:to_store")

broadcast_timer(50)

create_table_fn =
Keyword.get(opts, :create_table_fn, fn ->
:ets.new(:last_dispatched_vehicles, [:set, :named_table])
{:ok, %{last_dispatched_table_name: :last_dispatched_vehicles}}
end)

create_table_fn.()
end

@impl true
# Any time there is a reset_event, broadcast so that subscribers are immediately
# notified of the changes. This way, when the vehicle stream first starts,
# consumers don't have to wait `:vehicles_broadcast_interval_ms` to receive their first message.
def handle_info(:reset_event, state) do
send(self(), :broadcast)
{:noreply, state, :hibernate}
end

def handle_info(:timed_broadcast, state) do
send(self(), :broadcast)
broadcast_timer()
{:noreply, state, :hibernate}
end

@impl GenServer
def handle_info(:broadcast, %{last_dispatched_table_name: last_dispatched} = state) do
Registry.dispatch(MobileAppBackend.Vehicles.Registry, @fetch_registry_key, fn entries ->
Enum.group_by(
entries,
fn {_, {fetch_keys, format_fn}} -> {fetch_keys, format_fn} end,
fn {pid, _} -> pid end
)
|> Enum.each(fn {registry_value, pids} ->
broadcast_new_vehicles(registry_value, pids, last_dispatched)
end)
end)

{:noreply, state, :hibernate}
end

defp broadcast_new_vehicles(
{fetch_keys, format_fn} = registry_value,
pids,
last_dispatched_table_name
) do
new_vehicles =
fetch_keys
|> Store.Vehicles.fetch()
|> format_fn.()

last_dispatched_entry = :ets.lookup(last_dispatched_table_name, registry_value)

if !vehicles_already_broadcast(last_dispatched_entry, new_vehicles) do
broadcast_vehicles(pids, new_vehicles, registry_value, last_dispatched_table_name)
end
end

defp broadcast_vehicles(
pids,
vehicles,
{fetch_keys, _format_fn} = registry_value,
last_dispatched_table_name
) do
Logger.info("#{__MODULE__} broadcasting to pids len=#{length(pids)}")

{time_micros, _result} =
:timer.tc(__MODULE__, :broadcast_to_pids, [
pids,
vehicles
])

Logger.info(
"#{__MODULE__} broadcast_to_pids fetch_keys=#{inspect(fetch_keys)} duration=#{time_micros / 1000}"
)

:ets.insert(last_dispatched_table_name, {registry_value, vehicles})
end

defp vehicles_already_broadcast([], _new_vehicles) do
# Nothing has been broadcast yet
false
end

defp vehicles_already_broadcast([{_registry_key, last_vehicles}], new_vehicles) do
last_vehicles == new_vehicles
end

def broadcast_to_pids(pids, vehicles) do
Enum.each(
pids,
&send(
&1,
{:new_vehicles, vehicles}
)
)
end

defp broadcast_timer do
interval =
Application.get_env(:mobile_app_backend, :vehicles_broadcast_interval_ms, 500)

broadcast_timer(interval)
end

defp broadcast_timer(interval) do
Process.send_after(self(), :timed_broadcast, interval)
end
end
16 changes: 16 additions & 0 deletions lib/mobile_app_backend/vehicles/registry.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule MobileAppBackend.Vehicles.Registry do
def child_spec(_) do
Registry.child_spec(keys: :duplicate, name: __MODULE__)
end

@spec via_name(term()) :: GenServer.name()
def via_name(key), do: {:via, Registry, {__MODULE__, key}}

@spec find_pid(term()) :: pid() | nil
def find_pid(key) do
case Registry.lookup(__MODULE__, key) do
[{pid, _}] -> pid
[] -> nil
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,7 @@ defmodule MobileAppBackendWeb.PredictionsForStopsV2Channel do
@spec handle_info({:new_predictions, any()}, Phoenix.Socket.t()) ::
{:noreply, Phoenix.Socket.t()}
def handle_info({:new_predictions, new_predictions_for_stop}, socket) do
{time_micros, _result} =
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny log cleanup while copying over behavior of predictions channel

:timer.tc(fn ->
:ok = push(socket, "stream_data", new_predictions_for_stop)
end)

Logger.info("#{__MODULE__} push duration=#{time_micros / 1000}")

require Logger
:ok = push(socket, "stream_data", new_predictions_for_stop)
{:noreply, socket}
end
end
46 changes: 10 additions & 36 deletions lib/mobile_app_backend_web/channels/vehicle_channel.ex
Original file line number Diff line number Diff line change
@@ -1,49 +1,23 @@
defmodule MobileAppBackendWeb.VehicleChannel do
use MobileAppBackendWeb, :channel

alias MBTAV3API.JsonApi
alias MBTAV3API.Vehicle

@throttle_ms 500

@impl true
def join("vehicle:id:" <> vehicle_id, _payload, socket) do
{:ok, throttler} =
MobileAppBackend.Throttler.start_link(target: self(), cast: :send_data, ms: @throttle_ms)

{:ok, vehicle_data} = MBTAV3API.Stream.StaticInstance.subscribe("vehicles")
pubsub_module =
Application.get_env(
:mobile_app_backend,
MobileAppBackend.Vehicles.PubSub,
MobileAppBackend.Vehicles.PubSub
)

vehicle_data = filter_data(vehicle_data, vehicle_id)
vehicle = pubsub_module.subscribe(vehicle_id)

{:ok, vehicle_data,
assign(socket,
data: vehicle_data,
vehicle_id: vehicle_id,
throttler: throttler
)}
{:ok, %{vehicle: vehicle}, socket}
end

@impl true
def handle_info({:stream_data, "vehicles", all_vehicles_data}, socket) do
old_data = socket.assigns.data
new_data = filter_data(all_vehicles_data, socket.assigns.vehicle_id)

if old_data != new_data do
MobileAppBackend.Throttler.request(socket.assigns.throttler)
end

socket = assign(socket, data: new_data)
def handle_info({:new_vehicles, vehicle}, socket) do
:ok = push(socket, "stream_data", %{vehicle: vehicle})
{:noreply, socket}
end

@impl true
def handle_cast(:send_data, socket) do
:ok = push(socket, "stream_data", socket.assigns.data)
{:noreply, socket}
end

@spec filter_data(JsonApi.Object.full_map(), String.t()) :: %{vehicle: Vehicle.t() | nil}
defp filter_data(all_vehicles_data, vehicle_id) do
%{vehicle: Map.get(all_vehicles_data.vehicles, vehicle_id)}
end
end
Loading
Loading