Skip to content

Commit

Permalink
Add :timestamp option for unique period queries
Browse files Browse the repository at this point in the history
Jobs are frequently inserted and scheduled for a time far in the future.
It's often desirable for to consider scheduled jobs for uniqueness,
but unique jobs only checked the `:inserted_at` timestamp.

Now `unique` has a `timestamp` option that allows checking the
`:scheduled_at` timestamp instead.

Closes #927
  • Loading branch information
sorentwo committed Jul 28, 2023
1 parent 64b998d commit de38823
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 28 deletions.
16 changes: 12 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,9 @@ end

## Unique Jobs

The unique jobs feature lets you specify constraints to prevent enqueueing
duplicate jobs. Uniqueness is based on a combination of `args`, `queue`,
`worker`, `state` and insertion time. It is configured at the worker or job
level using the following options:
The unique jobs feature lets you specify constraints to prevent enqueueing duplicate jobs.
Uniqueness is based on a combination of `args`, `queue`, `worker`, `state` and insertion time. It
is configured at the worker or job level using the following options:

* `:period` — The number of seconds until a job is no longer considered duplicate. You should
always specify a period, otherwise Oban will default to 60 seconds. `:infinity` can be used to
Expand All @@ -475,6 +474,9 @@ level using the following options:
`:discarded`. By default all states except for `:discarded` and `:cancelled` are checked, which
prevents duplicates even if the previous job has been completed.

* `:timestamp` — Which timestamp to check the period against. The available timestamps are
`:inserted_at` or `:scheduled_at`, and it defaults to `:inserted_at` for legacy reasons.

For example, configure a worker to be unique across all fields and states for 60
seconds:

Expand All @@ -488,6 +490,12 @@ Configure the worker to be unique only by `:worker` and `:queue`:
use Oban.Worker, unique: [fields: [:queue, :worker], period: 60]
```

Check the `:scheduled_at` timestamp instead of `:inserted_at` for uniqueness:

```elixir
use Oban.Worker, unique: [period: 120, timestamp: :scheduled_at]
```

Or, configure a worker to be unique until it has executed:

```elixir
Expand Down
10 changes: 5 additions & 5 deletions lib/oban/engines/basic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ defmodule Oban.Engines.Basic do
end

defp unique_query(%{changes: %{unique: %{} = unique}} = changeset) do
%{fields: fields, keys: keys, period: period, states: states} = unique
%{fields: fields, keys: keys, period: period, states: states, timestamp: timestamp} = unique

keys = Enum.map(keys, &to_string/1)
states = Enum.map(states, &to_string/1)
Expand All @@ -372,7 +372,7 @@ defmodule Oban.Engines.Basic do
query =
Job
|> where([j], j.state in ^states)
|> since_period(period)
|> since_period(period, timestamp)
|> where(^dynamic)
|> limit(1)

Expand Down Expand Up @@ -410,10 +410,10 @@ defmodule Oban.Engines.Basic do
end
end

defp since_period(query, :infinity), do: query
defp since_period(query, :infinity, _timestamp), do: query

defp since_period(query, period) do
where(query, [j], j.inserted_at >= ^seconds_from_now(-period))
defp since_period(query, period, timestamp) do
where(query, [j], field(j, ^timestamp) >= ^seconds_from_now(-period))
end

defp acquire_lock(conf, base_key) do
Expand Down
4 changes: 2 additions & 2 deletions lib/oban/engines/lite.ex
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ defmodule Oban.Engines.Lite do
# Insertion

defp fetch_unique(conf, %{changes: %{unique: %{} = unique}} = changeset) do
%{fields: fields, keys: keys, period: period, states: states} = unique
%{fields: fields, keys: keys, period: period, states: states, timestamp: timestamp} = unique

keys = Enum.map(keys, &to_string/1)
states = Enum.map(states, &to_string/1)
Expand Down Expand Up @@ -311,7 +311,7 @@ defmodule Oban.Engines.Lite do
query =
Job
|> where([j], j.state in ^states)
|> where([j], fragment("datetime(?) >= datetime(?)", j.inserted_at, ^since))
|> where([j], fragment("datetime(?) >= datetime(?)", field(j, ^timestamp), ^since))
|> where(^dynamic)
|> limit(1)

Expand Down
32 changes: 18 additions & 14 deletions lib/oban/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ defmodule Oban.Job do
| {:keys, [atom()]}
| {:period, unique_period()}
| {:states, [unique_state()]}
| {:timestamp, :inserted_at | :scheduled_at}

@type replace_option :: [
:args
Expand Down Expand Up @@ -206,46 +207,46 @@ defmodule Oban.Job do
|> Oban.Job.new(queue: :default, worker: MyApp.Worker)
|> Oban.insert()
Generate a pre-configured job for `MyApp.Worker` and push it:
Generate a pre-configured job for `MyApp.Worker`:
%{id: 1, user_id: 2} |> MyApp.Worker.new() |> Oban.insert()
MyApp.Worker.new(%{id: 1, user_id: 2})
Schedule a job to run in 5 seconds:
%{id: 1} |> MyApp.Worker.new(schedule_in: 5) |> Oban.insert()
MyApp.Worker.new(%{id: 1}, schedule_in: 5)
Schedule a job to run in 5 minutes:
%{id: 1} |> MyApp.Worker.new(schedule_in: {5, :minutes}) |> Oban.insert()
MyApp.Worker.new(%{id: 1}, schedule_in: {5, :minutes})
Insert a job, ensuring that it is unique within the past minute:
%{id: 1} |> MyApp.Worker.new(unique: [period: 60]) |> Oban.insert()
MyApp.Worker.new(%{id: 1}, unique: [period: 60])
Insert a unique job where the period is compared to the `scheduled_at` timestamp rather than
`inserted_at`:
MyApp.Worker.new(%{id: 1}, unique: [period: 60, timestamp: :scheduled_at])
Insert a unique job based only on the worker field, and within multiple states:
fields = [:worker]
states = [:available, :scheduled, :executing, :retryable, :completed]
%{id: 1}
|> MyApp.Worker.new(unique: [fields: fields, period: 60, states: states])
|> Oban.insert()
MyApp.Worker.new(%{id: 1}, unique: [fields: fields, period: 60, states: states])
Insert a unique job considering only the worker and specified keys in the args:
keys = [:account_id, :url]
args = %{account_id: 1, url: "https://example.com"}
%{account_id: 1, url: "https://example.com"}
|> MyApp.Worker.new(unique: [fields: [:args, :worker], keys: keys])
|> Oban.insert()
MyApp.Worker.new(args, unique: [fields: [:args, :worker], keys: keys])
Insert a unique job considering only specified keys in the meta:
unique = [fields: [:meta], keys: [:slug]]
%{id: 1}
|> MyApp.Worker.new(meta: %{slug: "unique-key"}, unique: unique)
|> Oban.insert()
MyApp.Worker.new(%{id: 1}, meta: %{slug: "unique-key"}, unique: unique)
"""
@doc since: "0.1.0"
@spec new(args(), [option()]) :: changeset()
Expand Down Expand Up @@ -279,6 +280,7 @@ defmodule Oban.Job do
@unique_fields ~w(args queue worker)a
@unique_period 60
@unique_states ~w(scheduled available executing retryable completed)a
@unique_timestamp :inserted_at

@doc """
A canonical list of all possible job states.
Expand Down Expand Up @@ -372,6 +374,7 @@ defmodule Oban.Job do
def valid_unique_opt?({:period, :infinity}), do: true
def valid_unique_opt?({:period, period}), do: is_integer(period) and period > 0
def valid_unique_opt?({:states, [_ | _] = states}), do: states -- states() == []
def valid_unique_opt?({:timestamp, stamp}), do: stamp in ~w(inserted_at scheduled_at)a
def valid_unique_opt?(_option), do: false

@time_units ~w(
Expand Down Expand Up @@ -446,6 +449,7 @@ defmodule Oban.Job do
|> Map.put_new(:keys, [])
|> Map.put_new(:period, @unique_period)
|> Map.put_new(:states, @unique_states)
|> Map.put_new(:timestamp, @unique_timestamp)

case validate_unique_opts(unique) do
:ok ->
Expand Down
15 changes: 15 additions & 0 deletions test/oban/engine_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,21 @@ for engine <- [Oban.Engines.Basic, Oban.Engines.Lite] do
assert job_9.id in [job_3.id, job_6.id]
end

@tag :unique
test "scoping uniqueness by period compared to the scheduled time", %{name: name} do
job_1 = insert!(name, %{id: 1}, scheduled_at: seconds_ago(120))

uniq_insert = fn args, period, timestamp ->
Oban.insert(name, Worker.new(args, unique: [period: period, timestamp: timestamp]))
end

assert {:ok, job_2} = uniq_insert.(%{id: 1}, 121, :scheduled_at)
assert {:ok, job_3} = uniq_insert.(%{id: 1}, 119, :scheduled_at)

assert job_1.id == job_2.id
assert job_1.id != job_3.id
end

@tag :unique
test "replacing fields on unique conflict", %{name: name} do
four_seconds = seconds_from_now(4)
Expand Down
12 changes: 9 additions & 3 deletions test/oban/job_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,24 @@ defmodule Oban.JobTest do
fields: [:args, :queue, :worker],
keys: [],
period: 60,
states: [:scheduled, :available, :executing, :retryable, :completed]
states: [:scheduled, :available, :executing, :retryable, :completed],
timestamp: :inserted_at
}
end

test "overriding unique defaults" do
changeset =
Job.new(%{}, worker: Fake, unique: [fields: [:meta, :worker], states: [:available]])
Job.new(%{},
worker: Fake,
unique: [fields: [:meta, :worker], states: [:available], timestamp: :scheduled_at]
)

assert changeset.changes[:unique] == %{
fields: [:meta, :worker],
keys: [],
period: 60,
states: [:available]
states: [:available],
timestamp: :scheduled_at
}
end

Expand All @@ -92,6 +97,7 @@ defmodule Oban.JobTest do
assert Job.new(%{}, worker: Fake, unique: [keys: [[]]]).errors[:unique]
assert Job.new(%{}, worker: Fake, unique: [period: :bogus]).errors[:unique]
assert Job.new(%{}, worker: Fake, unique: [states: [:random]]).errors[:unique]
assert Job.new(%{}, worker: Fake, unique: [timestamp: :updated_at]).errors[:unique]
end
end

Expand Down

0 comments on commit de38823

Please sign in to comment.