Skip to content

Commit

Permalink
chore: Add telemetry handlers for Oban job events (#1029)
Browse files Browse the repository at this point in the history
* chore: Add telemetry handlers for Oban job events

* Test Oban job exception logger

* Logger.warn -> Logger.warning
  • Loading branch information
jzimbel-mbta authored Oct 31, 2024
1 parent 414abeb commit 9cd8436
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 24 deletions.
2 changes: 2 additions & 0 deletions lib/arrow/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ defmodule Arrow.Application do
run_adjustment_fetcher? = Application.get_env(:arrow, :fetch_adjustments?)
run_migrations_at_startup? = Application.get_env(:arrow, :run_migrations_at_startup?)

Arrow.Telemetry.setup_telemetry()

# List all child processes to be supervised
children =
[
Expand Down
31 changes: 9 additions & 22 deletions lib/arrow/gtfs.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Arrow.Gtfs do

require Logger
alias Arrow.Gtfs.Importable
alias Arrow.Gtfs.JobHelper
alias Arrow.Repo

@import_timeout_ms :timer.minutes(10)
Expand All @@ -24,53 +25,39 @@ defmodule Arrow.Gtfs do
@spec import(Unzip.t(), String.t(), String.t() | nil, Oban.Job.t(), boolean) ::
:ok | {:error, term}
def import(unzip, new_version, current_version, job, dry_run? \\ false) do
Logger.info("GTFS import or validation job starting #{job_logging_params(job)}")
job_info = JobHelper.logging_params(job)

Logger.info("GTFS import or validation job starting #{job_info}")

with :ok <- validate_required_files(unzip),
:ok <- validate_version_change(new_version, current_version) do
case import_transaction(unzip, dry_run?) do
{:ok, _} ->
Logger.info("GTFS import success #{job_logging_params(job)}")
Logger.info("GTFS import success #{job_info}")
:ok

{:error, :dry_run_success} ->
Logger.info("GTFS validation success #{job_logging_params(job)}")
Logger.info("GTFS validation success #{job_info}")
:ok

{:error, reason} = error ->
Logger.warning(
"GTFS import or validation failed #{job_logging_params(job)} reason=#{inspect(reason)}"
)
Logger.warning("GTFS import or validation failed reason=#{inspect(reason)} #{job_info}")

error
end
else
:unchanged ->
Logger.info("GTFS import skipped due to unchanged version #{job_logging_params(job)}")
Logger.info("GTFS import skipped due to unchanged version #{job_info}")

:ok

{:error, reason} = error ->
Logger.warning(
"GTFS import or validation failed #{job_logging_params(job)} reason=#{inspect(reason)}"
)
Logger.warning("GTFS import or validation failed reason=#{inspect(reason)} #{job_info}")

error
end
end

defp job_logging_params(job) do
s3_object_key =
job.args
|> Map.fetch!("s3_uri")
|> URI.parse()
|> then(& &1.path)

archive_version = Map.fetch!(job.args, "archive_version")

"job_id=#{job.id} archive_s3_object_key=#{s3_object_key} archive_version=\"#{archive_version}\" job_worker=#{job.worker}"
end

defp import_transaction(unzip, dry_run?) do
transaction = fn ->
_ = truncate_all()
Expand Down
2 changes: 1 addition & 1 deletion lib/arrow/gtfs/archive.ex
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ defmodule Arrow.Gtfs.Archive do
get_object_op = ExAws.S3.get_object(bucket, object_key)

case apply(mod, fun, [get_object_op]) do
{:ok, %{body: zip_data}} ->
{:ok, %{body: zip_data, status_code: 200}} ->
zip_data
|> List.wrap()
|> from_iodata()
Expand Down
18 changes: 17 additions & 1 deletion lib/arrow/gtfs/job_helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule Arrow.Gtfs.JobHelper do
:all | :queued | :executing | :succeeded | :failed | :cancelled | :not_done | :done

@doc """
Returns details about GTFS import jobs.
Returns details about GTFS import/validation jobs in a JSON-encodable list of maps.
"""
@spec check_jobs(module, status_filter) :: list(map)
def check_jobs(worker_mod, status_filter) do
Expand All @@ -36,6 +36,22 @@ defmodule Arrow.Gtfs.JobHelper do
)
end

@doc """
Returns relevant info about an import/validation job, to be included in a log message.
"""
@spec logging_params(Oban.Job.t()) :: String.t()
def logging_params(job) do
s3_object_key =
job.args
|> Map.fetch!("s3_uri")
|> URI.parse()
|> then(& &1.path)

archive_version = Map.fetch!(job.args, "archive_version")

"job_id=#{job.id} archive_s3_object_key=#{inspect(s3_object_key)} archive_version=#{inspect(archive_version)} job_worker=#{inspect(job.worker)}"
end

defp job_filters do
%{
all: Enum.map(Oban.Job.states(), &Atom.to_string/1),
Expand Down
60 changes: 60 additions & 0 deletions lib/arrow/telemetry.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
defmodule Arrow.Telemetry do
@moduledoc """
Telemetry listeners for Arrow business logic.
"""
require Logger

@spec setup_telemetry() :: :ok
def setup_telemetry do
_ =
:telemetry.attach_many(
"oban",
[[:oban, :job, :start], [:oban, :job, :stop], [:oban, :job, :exception]],
# telemetry prefers event handler to be passed as a non-local function
# capture--i.e., with module name included--for performance reasons.
&Arrow.Telemetry.handle_event/4,
[]
)

:ok
end

def handle_event(event, measures, meta, config)

def handle_event([:oban, :job, :start], _measures, meta, _config) do
Logger.info("Oban job started #{get_job_info(meta.job)}")
end

def handle_event([:oban, :job, :stop], measures, meta, _config) do
Logger.info(
"Oban job stopped #{get_job_info(meta.job)} state=#{meta.state} result=#{inspect(meta.result)} duration=#{measures.duration} memory=#{measures.memory} queue_time=#{measures.queue_time}"
)
end

def handle_event([:oban, :job, :exception], measures, meta, _config) do
details =
case meta.kind do
:error ->
message = Exception.message(meta.reason)
full_details = Exception.format(meta.kind, meta.reason, meta.stacktrace)
"message=#{inspect(message)}\n#{full_details}"

_other ->
"\n#{Exception.format(meta.kind, meta.reason, meta.stacktrace)}"
end

Logger.warning(
"Oban job exception #{get_job_info(meta.job)} state=#{meta.state} result=#{inspect(meta.result)} duration=#{measures.duration} memory=#{measures.memory} queue_time=#{measures.queue_time} #{details}"
)
end

@gtfs_workers [inspect(Arrow.Gtfs.ImportWorker), inspect(Arrow.Gtfs.ValidationWorker)]

defp get_job_info(%Oban.Job{worker: worker} = job) when worker in @gtfs_workers do
Arrow.Gtfs.JobHelper.logging_params(job)
end

defp get_job_info(job) do
"job_id=#{job.id} job_args=#{inspect(job.args)} job_worker=#{inspect(job.worker)}"
end
end
4 changes: 4 additions & 0 deletions lib/arrow_web/telemetry.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
defmodule ArrowWeb.Telemetry do
@moduledoc """
Provides data for the LiveDashboard "metrics" tab.
"""

use Supervisor
import Telemetry.Metrics

Expand Down
36 changes: 36 additions & 0 deletions test/arrow/telemetry_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
defmodule Arrow.ExceptionalWorker do
@moduledoc """
Worker that raises an exception.
"""
use Oban.Worker

@impl Oban.Worker
def perform(%Oban.Job{args: %{"arg" => arg}}) do
raise "argh! arg: #{arg}"
end
end

defmodule Arrow.TelemetryTest do
@moduledoc false
use ExUnit.Case, async: true
use Oban.Testing, repo: Arrow.Repo
import ExUnit.CaptureLog

describe "oban.job.exception listener" do
test "logs exception info" do
log =
capture_log([level: :warning], fn ->
try do
perform_job(Arrow.ExceptionalWorker, %{
arg: "argyle gargoyle"
})
rescue
_ -> nil
end
end)

assert log =~ "Oban job exception"
assert log =~ ~s|message="argh! arg: argyle gargoyle"|
end
end
end

0 comments on commit 9cd8436

Please sign in to comment.