Skip to content

Commit

Permalink
Remove job casting from building test queries
Browse files Browse the repository at this point in the history
The use of `Job.new` to normalaize query fields would change assertions
with a "scheduled_at" date to _only_ check scheduled, never "available".
As a bonus, this also cleans up applying where clauses for assertions.

Closes #872
  • Loading branch information
sorentwo committed Apr 3, 2023
1 parent 9ed1aba commit f8e5785
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 64 deletions.
94 changes: 39 additions & 55 deletions lib/oban/testing.ex
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ defmodule Oban.Testing do
@moduledoc since: "0.3.0"

import ExUnit.Assertions, only: [assert: 2, refute: 2, flunk: 1]
import Ecto.Query, only: [order_by: 2, select: 2, where: 2, where: 3]
import Ecto.Query, only: [order_by: 2, select: 3, where: 2, where: 3]

alias Ecto.Changeset

Expand Down Expand Up @@ -496,6 +496,17 @@ defmodule Oban.Testing do
"""
end

defp extract_conf(repo, opts) do
{conf_opts, opts} = Keyword.split(opts, [:prefix])

conf =
conf_opts
|> Keyword.put(:repo, repo)
|> Config.new()

{conf, opts}
end

# Enqueued Helpers

defp job_exists?(repo, opts) do
Expand All @@ -519,71 +530,44 @@ defmodule Oban.Testing do
defp available_jobs(repo, opts) do
{conf, opts} = extract_conf(repo, opts)

fields = Keyword.keys(opts)
query =
[]
|> base_query()
|> select([j], map(j, ^Keyword.keys(opts)))

conf
|> Repo.all([] |> base_query() |> select(^fields))
|> Enum.map(&Map.take(&1, fields))
Repo.all(conf, query)
end

defp base_query(opts) do
fields_with_opts = normalize_fields(opts)
query =
Job
|> where([j], j.state in ["available", "scheduled"])
|> order_by(desc: :id)

Job
|> where([j], j.state in ["available", "scheduled"])
|> apply_where_clauses(fields_with_opts)
|> order_by(desc: :id)
Enum.reduce(opts, query, &apply_where/2)
end

defp extract_conf(repo, opts) do
{conf_opts, opts} = Keyword.split(opts, [:prefix])
defp apply_where({:args, args}, query), do: where(query, args: ^json_encode_decode(args))
defp apply_where({:meta, meta}, query), do: where(query, meta: ^json_encode_decode(meta))
defp apply_where({:queue, queue}, query), do: where(query, queue: ^to_string(queue))
defp apply_where({:state, state}, query), do: where(query, state: ^to_string(state))
defp apply_where({:worker, worker}, query), do: where(query, worker: ^Worker.to_string(worker))

conf =
conf_opts
|> Keyword.put(:repo, repo)
|> Config.new()

{conf, opts}
end

defp normalize_fields(opts) do
{fields, field_opts} = Enum.map_reduce(opts, [], &extract_field_opts/2)

args = Keyword.get(fields, :args, %{})
keys = Keyword.keys(fields)

args
|> Job.new(fields)
|> Changeset.apply_changes()
|> Map.from_struct()
|> Map.take(keys)
|> Enum.map(fn {key, value} -> {key, value, Keyword.get(field_opts, key, [])} end)
end
defp apply_where({key, val}, query) when key in @timestamp_fields do
{time, delta} =
case val do
{time, delta: delta} ->
{time, delta}

defp extract_field_opts({key, {value, field_opts}}, field_opts_acc) do
{{key, value}, [{key, field_opts} | field_opts_acc]}
end

defp extract_field_opts({key, value}, field_opts_acc) do
{{key, value}, field_opts_acc}
end

defp apply_where_clauses(query, []), do: query

defp apply_where_clauses(query, [{key, value, opts} | rest]) when key in @timestamp_fields do
delta = Keyword.get(opts, :delta, 1)
time ->
{time, 1}
end

window_start = DateTime.add(value, -delta, :second)
window_end = DateTime.add(value, delta, :second)
begin = DateTime.add(time, -delta, :second)
until = DateTime.add(time, +delta, :second)

query
|> where([j], fragment("? BETWEEN ? AND ?", field(j, ^key), ^window_start, ^window_end))
|> apply_where_clauses(rest)
where(query, [j], fragment("? BETWEEN ? AND ?", field(j, ^key), ^begin, ^until))
end

defp apply_where_clauses(query, [{key, value, _opts} | rest]) do
query
|> where(^[{key, value}])
|> apply_where_clauses(rest)
end
defp apply_where({key, value}, query), do: where(query, ^[{key, value}])
end
15 changes: 6 additions & 9 deletions test/oban/testing_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -252,16 +252,13 @@ defmodule Oban.TestingTest do
assert_enqueued worker: Ping, prefix: "public"
end

test "checking for jobs with matching timestamps with delta" do
insert!(%{}, worker: Ping, scheduled_at: seconds_from_now(60))
test "checking for jobs with matching timestamps within delta" do
insert!(%{}, worker: Ping)
insert!(%{}, worker: Pong, scheduled_at: seconds_from_now(60))

assert_enqueued worker: Ping, scheduled_at: seconds_from_now(60)
end

test "checking for jobs allows to configure timestamp delta" do
insert!(%{}, worker: Ping, scheduled_at: seconds_from_now(60))

assert_enqueued worker: Ping, scheduled_at: {seconds_from_now(69), delta: 10}
assert_enqueued worker: Ping, state: "available", scheduled_at: DateTime.utc_now()
assert_enqueued worker: Pong, scheduled_at: seconds_from_now(60)
assert_enqueued worker: Pong, scheduled_at: {seconds_from_now(69), delta: 10}
end

test "asserting that jobs are now or will eventually be enqueued" do
Expand Down

0 comments on commit f8e5785

Please sign in to comment.