Skip to content

Commit

Permalink
Introduce Oban.Cron with schedule_interval/4
Browse files Browse the repository at this point in the history
The new Cron module allows processes, namely plugins, to get cron-like
scheduled functionality with a single function call. This will allow
plugins to removes boilerplate around parsing, scheduling, and
evaluating for cron behavior.
  • Loading branch information
sorentwo committed Nov 13, 2024
1 parent 899c31c commit 14ec339
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 54 deletions.
32 changes: 32 additions & 0 deletions lib/oban/cron.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
defmodule Oban.Cron do
@moduledoc false

alias Oban.Cron.Expression

@spec schedule_interval(pid(), term(), binary(), Calendar.time_zone()) :: :ok
def schedule_interval(pid, message, schedule, timezone \\ "Etc/UTC") do
expression = Expression.parse!(schedule)

:timer.apply_after(interval_to_next_minute(), fn ->
now = DateTime.now!(timezone)

if Expression.now?(expression, now) do
send(pid, message)
end

schedule_interval(pid, message, schedule, timezone)
end)

:ok
end

@spec interval_to_next_minute(Time.t()) :: pos_integer()
def interval_to_next_minute(time \\ Time.utc_now()) do
time
|> Time.add(60)
|> Map.put(:second, 0)
|> Time.diff(time)
|> Integer.mod(86_400)
|> :timer.seconds()
end
end
12 changes: 2 additions & 10 deletions lib/oban/plugins/cron.ex
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ defmodule Oban.Plugins.Cron do
use GenServer

alias Oban.Cron.Expression
alias Oban.{Job, Peer, Plugin, Repo, Validation, Worker}
alias Oban.{Cron, Job, Peer, Plugin, Repo, Validation, Worker}
alias __MODULE__, as: State

@opaque expression :: Expression.t()
Expand Down Expand Up @@ -151,15 +151,7 @@ defmodule Oban.Plugins.Cron do
defdelegate parse(input), to: Expression

@doc false
@spec interval_to_next_minute(Time.t()) :: pos_integer()
def interval_to_next_minute(time \\ Time.utc_now()) do
time
|> Time.add(60)
|> Map.put(:second, 0)
|> Time.diff(time)
|> Integer.mod(86_400)
|> :timer.seconds()
end
defdelegate interval_to_next_minute(), to: Cron

@impl GenServer
def init(state) do
Expand Down
35 changes: 6 additions & 29 deletions lib/oban/plugins/reindexer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ defmodule Oban.Plugins.Reindexer do

use GenServer

alias Oban.Cron.Expression
alias Oban.Plugins.Cron
alias Oban.{Peer, Plugin, Repo, Validation}
alias Oban.{Cron, Peer, Plugin, Repo, Validation}
alias __MODULE__, as: State

@type option ::
Expand All @@ -60,9 +58,8 @@ defmodule Oban.Plugins.Reindexer do

defstruct [
:conf,
:schedule,
:timer,
indexes: ~w(oban_jobs_args_index oban_jobs_meta_index),
schedule: "@midnight",
timeout: :timer.seconds(15),
timezone: "Etc/UTC"
]
Expand All @@ -76,11 +73,6 @@ defmodule Oban.Plugins.Reindexer do
def start_link(opts) do
{name, opts} = Keyword.pop(opts, :name)

opts =
opts
|> Keyword.put_new(:schedule, "@midnight")
|> Keyword.update!(:schedule, &Expression.parse!/1)

GenServer.start_link(__MODULE__, struct!(State, opts), name: name)
end

Expand All @@ -102,14 +94,9 @@ defmodule Oban.Plugins.Reindexer do

:telemetry.execute([:oban, :plugin, :init], %{}, %{conf: state.conf, plugin: __MODULE__})

{:ok, schedule_reindex(state)}
end
Cron.schedule_interval(self(), :reindex, state.schedule, state.timezone)

@impl GenServer
def terminate(_reason, %State{timer: timer}) do
if is_reference(timer), do: Process.cancel_timer(timer)

:ok
{:ok, state}
end

@impl GenServer
Expand All @@ -126,23 +113,13 @@ defmodule Oban.Plugins.Reindexer do
end
end)

{:noreply, schedule_reindex(state)}
end

# Scheduling

defp schedule_reindex(state) do
timer = Process.send_after(self(), :reindex, Cron.interval_to_next_minute())

%{state | timer: timer}
{:noreply, state}
end

# Reindexing

defp check_leadership_and_reindex(state) do
{:ok, datetime} = DateTime.now(state.timezone)

if Peer.leader?(state.conf) and Expression.now?(state.schedule, datetime) do
if Peer.leader?(state.conf) do
queries = [deindex_query(state) | Enum.map(state.indexes, &reindex_query(state, &1))]

Enum.reduce_while(queries, :ok, fn query, _ ->
Expand Down
20 changes: 20 additions & 0 deletions test/oban/cron_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
defmodule Oban.CronTest do
use ExUnit.Case, async: true

use ExUnitProperties

alias Oban.Cron

describe "interval_to_next_minute/1" do
property "calculated time is always within a short future range" do
check all hour <- integer(0..23),
minute <- integer(0..59),
second <- integer(0..59),
max_runs: 1_000 do
{:ok, time} = Time.new(hour, minute, second)

assert Cron.interval_to_next_minute(time) in 1_000..60_000
end
end
end
end
15 changes: 0 additions & 15 deletions test/oban/plugins/cron_test.exs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
defmodule Oban.Plugins.CronTest do
use Oban.Case, async: true

use ExUnitProperties

alias Oban.Cron.Expression
alias Oban.Plugins.Cron
alias Oban.{Job, Registry, TelemetryHandler}
Expand Down Expand Up @@ -66,19 +64,6 @@ defmodule Oban.Plugins.CronTest do
end
end

describe "interval_to_next_minute/1" do
property "calculated time is always within a short future range" do
check all hour <- integer(0..23),
minute <- integer(0..59),
second <- integer(0..59),
max_runs: 1_000 do
{:ok, time} = Time.new(hour, minute, second)

assert Cron.interval_to_next_minute(time) in 1_000..60_000
end
end
end

test "cron jobs are enqueued on startup and telemetry events are emitted" do
TelemetryHandler.attach_events()

Expand Down

0 comments on commit 14ec339

Please sign in to comment.