Skip to content

Commit

Permalink
Prevent sonar timeouts bubbling into the stager
Browse files Browse the repository at this point in the history
A clogged Ecto pool could cause cascading errors on startup due to a
sequence of calls between the `Notifier`, `Sonar`, and `Stager`.

1. `Sonar` sends a notification in `handle_continue` on startup.
2. The notification is blocked while the `Notifier` waits for a
   connection from the Ecto pool.
3. `Stager` checks for the connection status on startup, which would
   eventually time out because the `Sonar` hadn't finished initializing.
4. The `Stager` crashes from the timeout error.

This makes the folloing changes to prevent this sequence of events:

1. The `Stager` no longer gets the sonar status during startup.
2. The `Notifier` catches timeout errors from `Sonar` checks, warns
   about it, then returns an `:unknown` status.
  • Loading branch information
sorentwo committed Jun 25, 2024
1 parent ef0805f commit 4c0ac00
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 21 deletions.
8 changes: 4 additions & 4 deletions lib/oban/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ defmodule Oban.Job do
{:args, args()}
| {:max_attempts, pos_integer()}
| {:meta, map()}
| {:priority, pos_integer()}
| {:priority, 0..9}
| {:queue, atom() | binary()}
| {:schedule_in, schedule_in_option()}
| {:replace_args, boolean()}
| {:replace, [replace_option() | replace_by_state_option()]}
| {:replace_args, boolean()}
| {:schedule_in, schedule_in_option()}
| {:scheduled_at, DateTime.t()}
| {:tags, tags()}
| {:unique, [unique_option()]}
Expand All @@ -101,7 +101,7 @@ defmodule Oban.Job do
attempted_by: [binary()] | nil,
max_attempts: pos_integer(),
meta: map(),
priority: non_neg_integer(),
priority: 0..9,
inserted_at: DateTime.t(),
scheduled_at: DateTime.t(),
attempted_at: DateTime.t() | nil,
Expand Down
7 changes: 7 additions & 0 deletions lib/oban/notifier.ex
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ defmodule Oban.Notifier do

alias Oban.{Config, Registry, Sonar}

require Logger

@type channel :: atom()
@type name_or_conf :: Oban.name() | Config.t()
@type payload :: map() | [map()]
Expand Down Expand Up @@ -246,6 +248,11 @@ defmodule Oban.Notifier do
name
|> Oban.Registry.via(Sonar)
|> GenServer.call(:get_status)
catch
:exit, {:timeout, _} = reason ->
Logger.warning("Oban.Notifier.status/1 check failed due to #{inspect(reason)}.")

:unknown
end

@doc false
Expand Down
20 changes: 3 additions & 17 deletions lib/oban/stager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,7 @@ defmodule Oban.Stager do
# Init event is essential for auto-allow and backward compatibility.
:telemetry.execute([:oban, :plugin, :init], %{}, %{conf: state.conf, plugin: __MODULE__})

{:ok, state, {:continue, :start}}
end

@impl GenServer
def handle_continue(:start, %State{} = state) do
state =
state
|> schedule_staging()
|> check_mode()

{:noreply, state}
{:ok, schedule_staging(state)}
end

@impl GenServer
Expand All @@ -62,6 +52,7 @@ defmodule Oban.Stager do

@impl GenServer
def handle_info(:stage, %State{} = state) do
state = check_mode(state)
meta = %{conf: state.conf, leader: Peer.leader?(state.conf), plugin: __MODULE__}

:telemetry.span([:oban, :plugin], meta, fn ->
Expand All @@ -74,12 +65,7 @@ defmodule Oban.Stager do
end
end)

state =
state
|> schedule_staging()
|> check_mode()

{:noreply, state}
{:noreply, schedule_staging(state)}
end

defp stage_and_notify(true = _leader, state) do
Expand Down

0 comments on commit 4c0ac00

Please sign in to comment.