diff --git a/lib/oban/cron.ex b/lib/oban/cron.ex new file mode 100644 index 00000000..00fa0863 --- /dev/null +++ b/lib/oban/cron.ex @@ -0,0 +1,32 @@ +defmodule Oban.Cron do + @moduledoc false + + alias Oban.Cron.Expression + + @spec schedule_interval(pid(), term(), binary(), Calendar.time_zone()) :: :ok + def schedule_interval(pid, message, schedule, timezone \\ "Etc/UTC") do + expression = Expression.parse!(schedule) + + :timer.apply_after(interval_to_next_minute(), fn -> + now = DateTime.now!(timezone) + + if Expression.now?(expression, now) do + send(pid, message) + end + + schedule_interval(pid, message, schedule, timezone) + end) + + :ok + end + + @spec interval_to_next_minute(Time.t()) :: pos_integer() + def interval_to_next_minute(time \\ Time.utc_now()) do + time + |> Time.add(60) + |> Map.put(:second, 0) + |> Time.diff(time) + |> Integer.mod(86_400) + |> :timer.seconds() + end +end diff --git a/lib/oban/plugins/cron.ex b/lib/oban/plugins/cron.ex index 9f465800..9bf7a159 100644 --- a/lib/oban/plugins/cron.ex +++ b/lib/oban/plugins/cron.ex @@ -66,7 +66,7 @@ defmodule Oban.Plugins.Cron do use GenServer alias Oban.Cron.Expression - alias Oban.{Job, Peer, Plugin, Repo, Validation, Worker} + alias Oban.{Cron, Job, Peer, Plugin, Repo, Validation, Worker} alias __MODULE__, as: State @opaque expression :: Expression.t() @@ -151,15 +151,7 @@ defmodule Oban.Plugins.Cron do defdelegate parse(input), to: Expression @doc false - @spec interval_to_next_minute(Time.t()) :: pos_integer() - def interval_to_next_minute(time \\ Time.utc_now()) do - time - |> Time.add(60) - |> Map.put(:second, 0) - |> Time.diff(time) - |> Integer.mod(86_400) - |> :timer.seconds() - end + defdelegate interval_to_next_minute(), to: Cron @impl GenServer def init(state) do diff --git a/lib/oban/plugins/reindexer.ex b/lib/oban/plugins/reindexer.ex index 82aab084..2749e405 100644 --- a/lib/oban/plugins/reindexer.ex +++ b/lib/oban/plugins/reindexer.ex @@ -46,9 +46,7 @@ defmodule Oban.Plugins.Reindexer do use GenServer - alias Oban.Cron.Expression - alias Oban.Plugins.Cron - alias Oban.{Peer, Plugin, Repo, Validation} + alias Oban.{Cron, Peer, Plugin, Repo, Validation} alias __MODULE__, as: State @type option :: @@ -60,9 +58,8 @@ defmodule Oban.Plugins.Reindexer do defstruct [ :conf, - :schedule, - :timer, indexes: ~w(oban_jobs_args_index oban_jobs_meta_index), + schedule: "@midnight", timeout: :timer.seconds(15), timezone: "Etc/UTC" ] @@ -76,11 +73,6 @@ defmodule Oban.Plugins.Reindexer do def start_link(opts) do {name, opts} = Keyword.pop(opts, :name) - opts = - opts - |> Keyword.put_new(:schedule, "@midnight") - |> Keyword.update!(:schedule, &Expression.parse!/1) - GenServer.start_link(__MODULE__, struct!(State, opts), name: name) end @@ -102,14 +94,9 @@ defmodule Oban.Plugins.Reindexer do :telemetry.execute([:oban, :plugin, :init], %{}, %{conf: state.conf, plugin: __MODULE__}) - {:ok, schedule_reindex(state)} - end + Cron.schedule_interval(self(), :reindex, state.schedule, state.timezone) - @impl GenServer - def terminate(_reason, %State{timer: timer}) do - if is_reference(timer), do: Process.cancel_timer(timer) - - :ok + {:ok, state} end @impl GenServer @@ -126,23 +113,13 @@ defmodule Oban.Plugins.Reindexer do end end) - {:noreply, schedule_reindex(state)} - end - - # Scheduling - - defp schedule_reindex(state) do - timer = Process.send_after(self(), :reindex, Cron.interval_to_next_minute()) - - %{state | timer: timer} + {:noreply, state} end # Reindexing defp check_leadership_and_reindex(state) do - {:ok, datetime} = DateTime.now(state.timezone) - - if Peer.leader?(state.conf) and Expression.now?(state.schedule, datetime) do + if Peer.leader?(state.conf) do queries = [deindex_query(state) | Enum.map(state.indexes, &reindex_query(state, &1))] Enum.reduce_while(queries, :ok, fn query, _ -> diff --git a/test/oban/cron_test.exs b/test/oban/cron_test.exs new file mode 100644 index 00000000..d943144c --- /dev/null +++ b/test/oban/cron_test.exs @@ -0,0 +1,20 @@ +defmodule Oban.CronTest do + use ExUnit.Case, async: true + + use ExUnitProperties + + alias Oban.Cron + + describe "interval_to_next_minute/1" do + property "calculated time is always within a short future range" do + check all hour <- integer(0..23), + minute <- integer(0..59), + second <- integer(0..59), + max_runs: 1_000 do + {:ok, time} = Time.new(hour, minute, second) + + assert Cron.interval_to_next_minute(time) in 1_000..60_000 + end + end + end +end diff --git a/test/oban/plugins/cron_test.exs b/test/oban/plugins/cron_test.exs index 52f79433..93c0e02b 100644 --- a/test/oban/plugins/cron_test.exs +++ b/test/oban/plugins/cron_test.exs @@ -1,8 +1,6 @@ defmodule Oban.Plugins.CronTest do use Oban.Case, async: true - use ExUnitProperties - alias Oban.Cron.Expression alias Oban.Plugins.Cron alias Oban.{Job, Registry, TelemetryHandler} @@ -66,19 +64,6 @@ defmodule Oban.Plugins.CronTest do end end - describe "interval_to_next_minute/1" do - property "calculated time is always within a short future range" do - check all hour <- integer(0..23), - minute <- integer(0..59), - second <- integer(0..59), - max_runs: 1_000 do - {:ok, time} = Time.new(hour, minute, second) - - assert Cron.interval_to_next_minute(time) in 1_000..60_000 - end - end - end - test "cron jobs are enqueued on startup and telemetry events are emitted" do TelemetryHandler.attach_events()