Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ordered queue handling by workers #665

Merged
merged 6 commits into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ By default, GoodJob creates a single thread execution pool that will execute job

A pool is configured with the following syntax `<participating_queues>:<thread_count>`:

- `<participating_queues>`: either `queue1,queue2` (only those queues), `*` (all) or `-queue1,queue2` (all except those queues).
- `<participating_queues>`: either `queue1,queue2` (only those queues), `+queue1,queue2` (only those queues, and processed in order), `*` (all) or `-queue1,queue2` (all except those queues).
- `<thread_count>`: a count overriding for this specific pool the global `max-threads`.

Pool configurations are separated with a semicolon (;) in the `queues` configuration
Expand All @@ -683,6 +683,12 @@ By default, GoodJob creates a single thread execution pool that will execute job
- `-transactional_messages,batch_processing`: execute jobs enqueued on _any_ queue _excluding_ `transactional_messages` or `batch_processing`, with up to 2 threads.
- `*`: execute jobs on any queue, with up to 5 threads (as configured by `--max-threads=5`).

When a pool is performing jobs from multiple queues, jobs will be performed from specified queues, ordered by priority and creation time. To perform jobs from queues in the queues' given order, use the `+` modifier. In this example, jobs in `batch_processing` will be performed only when there are no jobs in `transactional_messages`:

```bash
bundle exec good_job --queues="+transactional_messages,batch_processing"
```

Configuration can be injected by environment variables too:

```bash
Expand Down
2 changes: 1 addition & 1 deletion lib/good_job/job_performer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def name
# Perform the next eligible job
# @return [Object, nil] Returns job result or +nil+ if no job was found
def next
job_query.perform_with_advisory_lock
job_query.perform_with_advisory_lock(parsed_queues: parsed_queues)
end

# Tests whether this performer should be used in GoodJob's current state.
Expand Down
47 changes: 41 additions & 6 deletions lib/models/good_job/execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,21 @@ class Execution < BaseRecord
# not match.
# - +{ include: Array<String> }+ indicates the listed queue names should
# match.
# - +{ include: Array<String>, ordered_queues: true }+ indicates the listed
# queue names should match, and dequeue should respect queue order.
# @example
# GoodJob::Execution.queue_parser('-queue1,queue2')
# => { exclude: [ 'queue1', 'queue2' ] }
def self.queue_parser(string)
string = string.presence || '*'

if string.first == '-'
case string.first
when '-'
exclude_queues = true
string = string[1..-1]
when '+'
ordered_queues = true
string = string[1..-1]
end

queues = string.split(',').map(&:strip)
Expand All @@ -46,6 +52,11 @@ def self.queue_parser(string)
{ all: true }
elsif exclude_queues
{ exclude: queues }
elsif ordered_queues
{
include: queues,
ordered_queues: true,
}
else
{ include: queues }
end
Expand Down Expand Up @@ -95,10 +106,34 @@ def self.queue_parser(string)
scope :creation_ordered, -> { order('created_at ASC') }

# Order jobs for de-queueing
# @!method dequeue_ordered
# @!method dequeueing_ordered
# @!scope class
# @return [ActiveRecord:Relation]
scope :dequeue_ordered, -> { priority_ordered.creation_ordered }
# @param parsed_queues [Hash]
# optional output of .queue_parser, parsed queues, will be used for
# ordered queues.
# @return [ActiveRecord::Relation]
scope :dequeueing_ordered, (lambda do |parsed_queues|
relation = self
relation = relation.queue_ordered(parsed_queues[:include]) if parsed_queues && parsed_queues[:ordered_queues] && parsed_queues[:include]
relation = relation.priority_ordered.creation_ordered

relation
end)

# Order jobs in order of queues in array param
# @!method queue_ordered
# @!scope class
# @param queues [Array<string] ordered names of queues
# @return [ActiveRecord::Relation]
scope :queue_ordered, (lambda do |queues|
clauses = queues.map.with_index do |queue_name, index|
"WHEN queue_name = '#{queue_name}' THEN #{index}"
end

order(
Arel.sql("(CASE #{clauses.join(' ')} ELSE #{queues.length} END)")
)
end)

# Order jobs by scheduled or created (oldest first).
# @!method schedule_ordered
Expand Down Expand Up @@ -165,8 +200,8 @@ def self.queue_parser(string)
# return value for the job's +#perform+ method, and the exception the job
# raised, if any (if the job raised, then the second array entry will be
# +nil+). If there were no jobs to execute, returns +nil+.
def self.perform_with_advisory_lock
unfinished.dequeue_ordered.only_scheduled.limit(1).with_advisory_lock(unlock_session: true) do |executions|
def self.perform_with_advisory_lock(parsed_queues: nil)
unfinished.dequeueing_ordered(parsed_queues).only_scheduled.limit(1).with_advisory_lock(unlock_session: true) do |executions|
execution = executions.first
break if execution.blank?
break :unlocked unless execution&.executable?
Expand Down
41 changes: 32 additions & 9 deletions scripts/benchmark_job_throughput.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# To run:
# bundle exec ruby scripts/benchmark_example.rb
# bundle exec ruby scripts/benchmark_job_throughput.rb
#

ENV['GOOD_JOB_EXECUTION_MODE'] = 'external'
Expand All @@ -12,13 +12,14 @@
booleans = [true, false]
priorities = (1..10).to_a
scheduled_minutes = (-60..60).to_a
queue_names = ["one", "two", "three"]

GoodJob::Execution.delete_all
puts "Seeding database"
executions_data = Array.new(10_000) do |i|
puts "Initializing seed record ##{i}" if (i % 1_000).zero?
{
queue_name: 'default',
queue_name: queue_names.sample,
priority: priorities.sample,
scheduled_at: booleans.sample ? scheduled_minutes.sample.minutes.ago : nil,
created_at: 90.minutes.ago,
Expand All @@ -31,24 +32,46 @@
GoodJob::Execution.insert_all(executions_data)

# ActiveRecord::Base.connection.execute('SET enable_seqscan = OFF')
# puts GoodJob::Execution.unfinished.dequeue_ordered.only_scheduled(use_coalesce: true).limit(1).advisory_lock.explain(analyze: true)
# exit!

Benchmark.ips do |x|
x.report("with priority only") do
x.report("without sorts") do
GoodJob::Execution.unfinished.only_scheduled.limit(1).with_advisory_lock do |executions|
# executions.first&.destroy!
end
end

x.report("sort by priority only") do
GoodJob::Execution.unfinished.priority_ordered.only_scheduled.limit(1).with_advisory_lock do |executions|
# executions.first&.destroy!
end
end

x.report("without priority") do
GoodJob::Execution.unfinished.only_scheduled.limit(1).with_advisory_lock do |executions|
x.report("sort by creation only") do
GoodJob::Execution.unfinished.creation_ordered.only_scheduled.limit(1).with_advisory_lock do |executions|
# executions.first&.destroy!
end
end

x.report("sort by priority and creation") do
GoodJob::Execution.unfinished.priority_ordered.creation_ordered.only_scheduled.limit(1).with_advisory_lock do |executions|
# executions.first&.destroy!
end
end

x.report("sort by ordered queues only") do
GoodJob::Execution.unfinished.queue_ordered(%w{one two three}).creation_ordered.only_scheduled.limit(1).with_advisory_lock do |executions|
# executions.first&.destroy!
end
end

x.report("sort by ordered queues and creation") do
GoodJob::Execution.unfinished.queue_ordered(%w{one two three}).creation_ordered.only_scheduled.limit(1).with_advisory_lock do |executions|
# executions.first&.destroy!
end
end

x.report("with priority and FIFO") do
GoodJob::Execution.unfinished.dequeue_ordered.only_scheduled.limit(1).with_advisory_lock do |executions|
x.report("sort by ordered queues, priority, and creation") do
GoodJob::Execution.unfinished.queue_ordered(%w{one two three}).priority_ordered.creation_ordered.only_scheduled.limit(1).with_advisory_lock do |executions|
# executions.first&.destroy!
end
end
Expand Down
35 changes: 34 additions & 1 deletion spec/lib/models/good_job/execution_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ def job_params
4.times do
described_class.perform_with_advisory_lock
end

expect(described_class.all.order(finished_at: :asc).to_a).to eq([
high_priority_job,
low_priority_job,
Expand All @@ -108,6 +107,26 @@ def job_params
])
end
end

context "with multiple jobs and ordered queues" do
def job_params
{ active_job_id: SecureRandom.uuid, queue_name: "default", priority: 0, serialized_params: { job_class: "TestJob" } }
end

let(:parsed_queues) { { include: %w{one two}, ordered_queues: true } }
let!(:queue_two_job) { described_class.create!(job_params.merge(queue_name: "two", created_at: 10.minutes.ago, priority: 100)) }
let!(:queue_one_job) { described_class.create!(job_params.merge(queue_name: "one", created_at: 1.minute.ago, priority: 1)) }

it "orders by queue order" do
2.times do
described_class.perform_with_advisory_lock(parsed_queues: parsed_queues)
end
expect(described_class.all.order(finished_at: :asc).to_a).to eq([
queue_one_job,
queue_two_job,
])
end
end
end

describe '.queue_parser' do
Expand All @@ -126,6 +145,11 @@ def job_params
expect(result).to eq({
all: true,
})
result = described_class.queue_parser('+first,second')
expect(result).to eq({
include: %w[first second],
ordered_queues: true,
})
end
end

Expand All @@ -146,6 +170,15 @@ def job_params
end
end

describe '.queue_ordered' do
it "produces SQL to order by queue order" do
query_sql = described_class.queue_ordered(%w{one two three}).to_sql
expect(query_sql).to include(
"ORDER BY (CASE WHEN queue_name = 'one' THEN 0 WHEN queue_name = 'two' THEN 1 WHEN queue_name = 'three' THEN 2 ELSE 3 END)"
)
end
end

describe '.next_scheduled_at' do
let(:active_job) { TestJob.new }

Expand Down
2 changes: 1 addition & 1 deletion spec/test_app/db/schema.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#
# It's strongly recommended that you check this file into your version control system.

ActiveRecord::Schema.define(version: 2022_01_02_200139) do
ActiveRecord::Schema.define(version: 2022_06_30_200758) do

# These are extensions that must be enabled in order to support this database
enable_extension "pgcrypto"
Expand Down