diff --git a/lib/oban/testing.ex b/lib/oban/testing.ex index 76ffb9d4..9c3617ee 100644 --- a/lib/oban/testing.ex +++ b/lib/oban/testing.ex @@ -116,7 +116,7 @@ defmodule Oban.Testing do @moduledoc since: "0.3.0" import ExUnit.Assertions, only: [assert: 2, refute: 2, flunk: 1] - import Ecto.Query, only: [order_by: 2, select: 2, where: 2, where: 3] + import Ecto.Query, only: [order_by: 2, select: 3, where: 2, where: 3] alias Ecto.Changeset @@ -496,6 +496,17 @@ defmodule Oban.Testing do """ end + defp extract_conf(repo, opts) do + {conf_opts, opts} = Keyword.split(opts, [:prefix]) + + conf = + conf_opts + |> Keyword.put(:repo, repo) + |> Config.new() + + {conf, opts} + end + # Enqueued Helpers defp job_exists?(repo, opts) do @@ -519,71 +530,44 @@ defmodule Oban.Testing do defp available_jobs(repo, opts) do {conf, opts} = extract_conf(repo, opts) - fields = Keyword.keys(opts) + query = + [] + |> base_query() + |> select([j], map(j, ^Keyword.keys(opts))) - conf - |> Repo.all([] |> base_query() |> select(^fields)) - |> Enum.map(&Map.take(&1, fields)) + Repo.all(conf, query) end defp base_query(opts) do - fields_with_opts = normalize_fields(opts) + query = + Job + |> where([j], j.state in ["available", "scheduled"]) + |> order_by(desc: :id) - Job - |> where([j], j.state in ["available", "scheduled"]) - |> apply_where_clauses(fields_with_opts) - |> order_by(desc: :id) + Enum.reduce(opts, query, &apply_where/2) end - defp extract_conf(repo, opts) do - {conf_opts, opts} = Keyword.split(opts, [:prefix]) + defp apply_where({:args, args}, query), do: where(query, args: ^json_encode_decode(args)) + defp apply_where({:meta, meta}, query), do: where(query, meta: ^json_encode_decode(meta)) + defp apply_where({:queue, queue}, query), do: where(query, queue: ^to_string(queue)) + defp apply_where({:state, state}, query), do: where(query, state: ^to_string(state)) + defp apply_where({:worker, worker}, query), do: where(query, worker: ^Worker.to_string(worker)) - conf = - conf_opts - |> Keyword.put(:repo, repo) - |> Config.new() - - {conf, opts} - end - - defp normalize_fields(opts) do - {fields, field_opts} = Enum.map_reduce(opts, [], &extract_field_opts/2) - - args = Keyword.get(fields, :args, %{}) - keys = Keyword.keys(fields) - - args - |> Job.new(fields) - |> Changeset.apply_changes() - |> Map.from_struct() - |> Map.take(keys) - |> Enum.map(fn {key, value} -> {key, value, Keyword.get(field_opts, key, [])} end) - end + defp apply_where({key, val}, query) when key in @timestamp_fields do + {time, delta} = + case val do + {time, delta: delta} -> + {time, delta} - defp extract_field_opts({key, {value, field_opts}}, field_opts_acc) do - {{key, value}, [{key, field_opts} | field_opts_acc]} - end - - defp extract_field_opts({key, value}, field_opts_acc) do - {{key, value}, field_opts_acc} - end - - defp apply_where_clauses(query, []), do: query - - defp apply_where_clauses(query, [{key, value, opts} | rest]) when key in @timestamp_fields do - delta = Keyword.get(opts, :delta, 1) + time -> + {time, 1} + end - window_start = DateTime.add(value, -delta, :second) - window_end = DateTime.add(value, delta, :second) + begin = DateTime.add(time, -delta, :second) + until = DateTime.add(time, +delta, :second) - query - |> where([j], fragment("? BETWEEN ? AND ?", field(j, ^key), ^window_start, ^window_end)) - |> apply_where_clauses(rest) + where(query, [j], fragment("? BETWEEN ? AND ?", field(j, ^key), ^begin, ^until)) end - defp apply_where_clauses(query, [{key, value, _opts} | rest]) do - query - |> where(^[{key, value}]) - |> apply_where_clauses(rest) - end + defp apply_where({key, value}, query), do: where(query, ^[{key, value}]) end diff --git a/test/oban/testing_test.exs b/test/oban/testing_test.exs index 78fd0b4d..19daf266 100644 --- a/test/oban/testing_test.exs +++ b/test/oban/testing_test.exs @@ -252,16 +252,13 @@ defmodule Oban.TestingTest do assert_enqueued worker: Ping, prefix: "public" end - test "checking for jobs with matching timestamps with delta" do - insert!(%{}, worker: Ping, scheduled_at: seconds_from_now(60)) + test "checking for jobs with matching timestamps within delta" do + insert!(%{}, worker: Ping) + insert!(%{}, worker: Pong, scheduled_at: seconds_from_now(60)) - assert_enqueued worker: Ping, scheduled_at: seconds_from_now(60) - end - - test "checking for jobs allows to configure timestamp delta" do - insert!(%{}, worker: Ping, scheduled_at: seconds_from_now(60)) - - assert_enqueued worker: Ping, scheduled_at: {seconds_from_now(69), delta: 10} + assert_enqueued worker: Ping, state: "available", scheduled_at: DateTime.utc_now() + assert_enqueued worker: Pong, scheduled_at: seconds_from_now(60) + assert_enqueued worker: Pong, scheduled_at: {seconds_from_now(69), delta: 10} end test "asserting that jobs are now or will eventually be enqueued" do