Skip to content

Commit

Permalink
perf(ScheduleController): Fetch schedules by individual stop for more…
Browse files Browse the repository at this point in the history
… cache hits (#231)

* perf(ScheduleController): Fetch schedules by individual stop for more cache hits

As implemented, if any request for a stop fails, the entire request fails. This is to preserve the existing all-or-nothing behavior for fetching schedules for all stops together. If we want to have partial failure states in the future, we'll want to adopt a frontend changes to represent the partial failure state.

* perf(MBTAV3API.Repository): Cache schedules responses for 1 hour

* feat(locustfile): Represent global data caching

* feat(load_testing): more realistic stop distribution

* fix(RepositoryCache): Actually start the cache

* style(locustfile): run linting

* perf(Repository): Try TTL cache for all static GTFS requests

* fix(RepositoryTest): clear cache

* fix(Locustfile): More realistic nearby stops numbers

* perf(ScheduleController): Don't use task when only one stop

* perf(ScheduleController): Only make requests async when more than 1

* test(Repository): Test schedules actually cached

* cleanup(locustfile): stray prints

* revert locustfile changes for separate PR

* refactor(Repository): Cache all/3, remove unused alerts fn

* feat(ScheduleController): unordered tasks & unsorted schedule list

* style: fix formatting

* fix(ScheduleControllerTest): Remove sorting expectation
  • Loading branch information
KaylaBrady authored Nov 8, 2024
1 parent 587b798 commit fdaf777
Show file tree
Hide file tree
Showing 10 changed files with 382 additions and 292 deletions.
4 changes: 4 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ config :mobile_app_backend, MBTAV3API.ResponseCache,
gc_interval: :timer.hours(1),
allocated_memory: 250_000_000

config :mobile_app_backend, MBTAV3API.RepositoryCache,
gc_interval: :timer.hours(2),
allocated_memory: 250_000_000

# Configures the endpoint
config :mobile_app_backend, MobileAppBackendWeb.Endpoint,
url: [host: "localhost"],
Expand Down
17 changes: 5 additions & 12 deletions lib/mbta_v3_api/repository.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ defmodule MBTAV3API.Repository do
"""
alias MBTAV3API.{JsonApi, Repository}

@callback alerts(JsonApi.Params.t(), Keyword.t()) ::
{:ok, JsonApi.Response.t(MBTAV3API.Alert.t())} | {:error, term()}

@callback route_patterns(JsonApi.Params.t(), Keyword.t()) ::
{:ok, JsonApi.Response.t(MBTAV3API.RoutePattern.t())} | {:error, term()}

Expand All @@ -22,13 +19,6 @@ defmodule MBTAV3API.Repository do
@callback trips(JsonApi.Params.t(), Keyword.t()) ::
{:ok, JsonApi.Response.t(MBTAV3API.Trip.t())} | {:error, term()}

def alerts(params, opts \\ []) do
Application.get_env(:mobile_app_backend, MBTAV3API.Repository, Repository.Impl).alerts(
params,
opts
)
end

def route_patterns(params, opts \\ []) do
Application.get_env(:mobile_app_backend, MBTAV3API.Repository, Repository.Impl).route_patterns(
params,
Expand Down Expand Up @@ -67,10 +57,12 @@ end

defmodule MBTAV3API.Repository.Impl do
@behaviour MBTAV3API.Repository

use Nebulex.Caching.Decorators

alias MBTAV3API.JsonApi

@impl true
def alerts(params, opts \\ []), do: all(MBTAV3API.Alert, params, opts)
@ttl :timer.hours(1)

@impl true
def route_patterns(params, opts \\ []), do: all(MBTAV3API.RoutePattern, params, opts)
Expand All @@ -89,6 +81,7 @@ defmodule MBTAV3API.Repository.Impl do

@spec all(module(), JsonApi.Params.t(), Keyword.t()) ::
{:ok, JsonApi.Response.t(JsonApi.Object.t())} | {:error, term()}
@decorate cacheable(cache: MBTAV3API.RepositoryCache, on_error: :nothing, opts: [ttl: @ttl])
defp all(module, params, opts) do
params = JsonApi.Params.flatten_params(params, module)
url = "/#{JsonApi.Object.plural_type(module.jsonapi_type())}"
Expand Down
6 changes: 6 additions & 0 deletions lib/mbta_v3_api/repository_cache.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
defmodule MBTAV3API.RepositoryCache do
@moduledoc """
Cache used to reduce the number of calls to the V3 API.
"""
use Nebulex.Cache, otp_app: :mobile_app_backend, adapter: Nebulex.Adapters.Local
end
1 change: 1 addition & 0 deletions lib/mobile_app_backend/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ defmodule MobileAppBackend.Application do
:default => [size: 200, count: 10, start_pool_metrics?: true]
}},
{MBTAV3API.ResponseCache, []},
{MBTAV3API.RepositoryCache, []},
MBTAV3API.Supervisor,
{MobileAppBackend.FinchPoolHealth, pool_name: Finch.CustomPool},
MobileAppBackend.MapboxTokenRotator,
Expand Down
82 changes: 66 additions & 16 deletions lib/mobile_app_backend_web/controllers/schedule_controller.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
defmodule MobileAppBackendWeb.ScheduleController do
use MobileAppBackendWeb, :controller
require Logger
alias MBTAV3API.JsonApi
alias MBTAV3API.Repository

def schedules(conn, %{"stop_ids" => stop_ids, "date_time" => date_time}) do
if stop_ids == "" do
def schedules(conn, %{"stop_ids" => stop_ids_concat, "date_time" => date_time}) do
if stop_ids_concat == "" do
json(conn, %{schedules: [], trips: %{}})
else
{:ok, data} =
get_filter(stop_ids, date_time)
|> fetch_schedules()
stop_ids = String.split(stop_ids_concat, ",")

json(conn, data)
service_date = parse_service_date(date_time)

filters = Enum.map(stop_ids, &get_filter(&1, service_date))

data =
case filters do
[filter] -> fetch_schedules(filter)
filters -> fetch_schedules_parallel(filters)
end

case data do
:error ->
conn
|> put_status(:internal_server_error)
|> json(%{error: "fetch_failed"})

data ->
json(conn, data)
end
end
end

Expand Down Expand Up @@ -39,20 +56,53 @@ defmodule MobileAppBackendWeb.ScheduleController do
json(conn, response)
end

@spec get_filter(String.t(), String.t()) :: [JsonApi.Params.filter_param()]
defp get_filter(stop_ids, date_time) do
date_time = Util.parse_datetime!(date_time)
service_date = Util.datetime_to_gtfs(date_time)
[stop: stop_ids, date: service_date]
@spec parse_service_date(String.t()) :: Date.t()
defp parse_service_date(date_string) do
date_string
|> Util.parse_datetime!()
|> Util.datetime_to_gtfs()
end

@spec get_filter(String.t(), Date.t()) :: [JsonApi.Params.filter_param()]
defp get_filter(stop_id, service_date) do
[stop: stop_id, date: service_date]
end

@spec fetch_schedules_parallel([[JsonApi.Params.filter_param()]]) ::
%{schedules: [MBTAV3API.Schedule.t()], trips: JsonApi.Object.trip_map()} | :error
defp fetch_schedules_parallel(filters) do
filters
|> Task.async_stream(
fn filter_params ->
{filter_params, fetch_schedules(filter_params)}
end,
ordered: false
)
|> Enum.reduce_while(%{schedules: [], trips: %{}}, fn result, acc ->
case result do
{:ok, {_params, %{schedules: schedules, trips: trips}}} ->
{:cont, %{schedules: acc.schedules ++ schedules, trips: Map.merge(acc.trips, trips)}}

{_result_type, {params, _response}} ->
Logger.warning(
"#{__MODULE__} skipped returning schedules due to error. params=#{inspect(params)}"
)

{:halt, :error}
end
end)
end

@spec fetch_schedules([JsonApi.Params.filter_param()]) ::
{:ok, %{schedules: [MBTAV3API.Schedule.t()], trips: JsonApi.Object.trip_map()}}
| {:error, term()}
%{schedules: [MBTAV3API.Schedule.t()], trips: JsonApi.Object.trip_map()}
| :error
defp fetch_schedules(filter) do
with {:ok, %{data: schedules, included: %{trips: trips}}} <-
Repository.schedules(filter: filter, include: :trip, sort: {:departure_time, :asc}) do
{:ok, %{schedules: schedules, trips: trips}}
case Repository.schedules(filter: filter, include: :trip) do
{:ok, %{data: schedules, included: %{trips: trips}}} ->
%{schedules: schedules, trips: trips}

_ ->
:error
end
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ defmodule MobileAppBackend.MixProject do
{:esbuild, "~> 0.7", runtime: Mix.env() == :dev},
{:tailwind, "~> 0.2.0", runtime: Mix.env() == :dev},
{:logster, "~> 1.1"},
{:decorator, "~> 1.4"},
{:diskusage_logger, "~> 0.2", only: :prod},
{:ehmon, github: "mbta/ehmon", only: :prod},
{:sobelow, "~> 0.13", only: [:dev, :test], runtime: false},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.4.0", "f239f68b588efa7707abce16a84d0d2acf3a0f50571f8bb7f56a15865aae820c", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7d98bac1ee4565d31b62d59f8823dfd8356a169e7fcbb83831b8a5397404c9de"},
"cowlib": {:hex, :cowlib, "2.13.0", "db8f7505d8332d98ef50a3ef34b34c1afddec7506e4ee4dd4a3a266285d282ca", [:make, :rebar3], [], "hexpm", "e1e1284dc3fc030a64b1ad0d8382ae7e99da46c3246b815318a4b848873800a4"},
"credo": {:hex, :credo, "1.7.9", "07bb31907746ae2b5e569197c9e16c0d75c8578a22f01bee63f212047efb2647", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "f87c11c34ba579f7c5044f02b2a807e1ed2fa5fdbb24dc7eb4ad59c1904887f3"},
"decorator": {:hex, :decorator, "1.4.0", "a57ac32c823ea7e4e67f5af56412d12b33274661bb7640ec7fc882f8d23ac419", [:mix], [], "hexpm", "0a07cedd9083da875c7418dea95b78361197cf2bf3211d743f6f7ce39656597f"},
"dialyxir": {:hex, :dialyxir, "1.4.4", "fb3ce8741edeaea59c9ae84d5cec75da00fa89fe401c72d6e047d11a61f65f70", [:mix], [{:erlex, ">= 0.2.7", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "cd6111e8017ccd563e65621a4d9a4a1c5cd333df30cebc7face8029cacb4eff6"},
"diskusage_logger": {:hex, :diskusage_logger, "0.2.0", "04fc48b538fe4de43153542a71ea94f623d54707d85844123baacfceedf625c3", [:mix], [], "hexpm", "e3f2aed1b0fc4590931c089a6453a4c4eb4c945912aa97bcabcc0cff7851f34d"},
"dns_cluster": {:hex, :dns_cluster, "0.1.3", "0bc20a2c88ed6cc494f2964075c359f8c2d00e1bf25518a6a6c7fd277c9b0c66", [:mix], [], "hexpm", "46cb7c4a1b3e52c7ad4cbe33ca5079fbde4840dedeafca2baf77996c2da1bc33"},
Expand Down
132 changes: 1 addition & 131 deletions test/mbta_v3_api/alert_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,141 +3,11 @@ defmodule MBTAV3API.AlertTest do

import Mox

alias MBTAV3API.{Alert, JsonApi, Repository}
alias MBTAV3API.{Alert, JsonApi}
import Test.Support.Sigils

setup :verify_on_exit!

test "get_all/1" do
expect(
MobileAppBackend.HTTPMock,
:request,
fn %Req.Request{url: %URI{path: "/alerts"}, options: %{params: params}} ->
assert params == %{
"fields[alert]" =>
"active_period,cause,description,effect,effect_name,header,informed_entity,lifecycle,updated_at",
"filter[lifecycle]" => "NEW,ONGOING,ONGOING_UPCOMING",
"filter[stop]" =>
"9983,6542,1241,8281,place-boyls,8279,49002,6565,place-tumnl,145,place-pktrm,place-bbsta"
}

{:ok,
Req.Response.json(%{
data: [
%{
"attributes" => %{
"active_period" => [
%{"end" => "2024-02-08T19:12:40-05:00", "start" => "2024-02-08T14:38:00-05:00"}
],
"cause" => "UNKNOWN_CAUSE",
"description" => "Description 1",
"effect" => "DELAY",
"header" => "Header 1",
"informed_entity" => [
%{
"activities" => ["BOARD", "EXIT", "RIDE"],
"route" => "11",
"route_type" => 3
}
],
"lifecycle" => "NEW",
"updated_at" => "2024-02-08T14:38:00-05:00"
},
"id" => "552825",
"links" => %{"self" => "/alerts/552825"},
"type" => "alert"
},
%{
"attributes" => %{
"active_period" => [
%{"end" => "2024-02-08T19:12:40-05:00", "start" => "2024-02-08T12:55:00-05:00"}
],
"cause" => "UNRULY_PASSENGER",
"description" => "Description 2",
"effect" => "DELAY",
"header" => "Header 2",
"informed_entity" => [
%{
"activities" => ["BOARD", "EXIT", "RIDE"],
"route" => "15",
"route_type" => 3
}
],
"lifecycle" => "NEW",
"updated_at" => "2024-02-08T12:55:00-05:00"
},
"id" => "552803",
"links" => %{"self" => "/alerts/552803"},
"type" => "alert"
}
]
})}
end
)

{:ok, %{data: alerts}} =
Repository.alerts(
filter: [
lifecycle: [:new, :ongoing, :ongoing_upcoming],
stop: [
"9983",
"6542",
"1241",
"8281",
"place-boyls",
"8279",
"49002",
"6565",
"place-tumnl",
"145",
"place-pktrm",
"place-bbsta"
]
]
)

assert alerts == [
%Alert{
id: "552825",
active_period: [
%Alert.ActivePeriod{start: ~B[2024-02-08 14:38:00], end: ~B[2024-02-08 19:12:40]}
],
cause: :unknown_cause,
description: "Description 1",
effect: :delay,
header: "Header 1",
informed_entity: [
%Alert.InformedEntity{
activities: [:board, :exit, :ride],
route: "11",
route_type: :bus
}
],
lifecycle: :new,
updated_at: ~B[2024-02-08 14:38:00]
},
%Alert{
id: "552803",
active_period: [
%Alert.ActivePeriod{start: ~B[2024-02-08 12:55:00], end: ~B[2024-02-08 19:12:40]}
],
cause: :unruly_passenger,
description: "Description 2",
effect: :delay,
header: "Header 2",
informed_entity: [
%Alert.InformedEntity{
activities: [:board, :exit, :ride],
route: "15",
route_type: :bus
}
],
lifecycle: :new,
updated_at: ~B[2024-02-08 12:55:00]
}
]
end

describe "active?/1" do
test "true if in single active period" do
assert Alert.active?(
Expand Down
Loading

0 comments on commit fdaf777

Please sign in to comment.