Skip to content

Commit

Permalink
fix flaky specs
Browse files Browse the repository at this point in the history
  • Loading branch information
ankithads committed Jun 18, 2024
1 parent 6ba5be1 commit d92a84a
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 17 deletions.
9 changes: 4 additions & 5 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,12 @@ jobs:
strategy:
fail-fast: false
matrix:
ruby_version: ["2.6", "2.7", "3.0"]
postgres-version: ["12", "13", "14", "15", "16"]
ruby_version: ["3.0.2"]

runs-on: ubuntu-latest
services:
postgres:
image: postgres:${{ matrix.postgres-version }}
image: postgres:14.2
env:
POSTGRES_DB: que-test
POSTGRES_USER: ubuntu
Expand All @@ -27,7 +26,7 @@ jobs:
--health-timeout 5s
--health-retries 10
lock_database:
image: postgres:${{ matrix.postgres-version }}
image: postgres:14.2
env:
POSTGRES_DB: lock-test
POSTGRES_USER: ubuntu
Expand Down Expand Up @@ -58,4 +57,4 @@ jobs:
ruby-version: "${{ matrix.ruby-version }}"
- name: Run specs
run: |
bundle exec rspec
bundle exec rspec
8 changes: 5 additions & 3 deletions lib/que/locker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def lock_job_with_lock_database(queue, cursor)
.where(retryable: true)
.order(:priority, :run_at, :job_id)
.limit(1).to_sql

result = Que.execute(query).first
return result if result.nil?

Expand All @@ -188,8 +188,10 @@ def locked?(job_id)

def handle_expired_cursors!
@consolidated_queues.each do |queue|
queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now)
reset_cursor_for!(queue) if queue_cursor_expires_at < monotonic_now
queue_cursor_expires_at = @queue_expires_at.fetch(queue, monotonic_now)


reset_cursor_for!(queue) if queue_cursor_expires_at <= monotonic_now
end
end

Expand Down
27 changes: 18 additions & 9 deletions spec/lib/que/locker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ def expect_to_lock_with(cursor:)
end

context "with just one job to lock" do
before do
described_class.instance_variable_set(:@queue_cursors, [0])
end
let!(:job_1) { FakeJob.enqueue(1, queue: queue, priority: 1).attrs }
let(:cursor_expiry) { 60 }

Expand Down Expand Up @@ -95,7 +98,9 @@ def expect_to_lock_with(cursor:)
context "with non-zero cursor expiry" do
let(:cursor_expiry) { 5 }

before { allow(locker).to receive(:monotonic_now) { @epoch } }
before do
allow(locker).to receive(:monotonic_now) { @epoch }
end

# This test simulates the repeated locking of jobs. We're trying to prove that
# the locker will use the previous jobs ID as a cursor until the expiry has
Expand All @@ -112,7 +117,8 @@ def expect_to_lock_with(cursor:)
expect_to_lock_with(cursor: job_1[:job_id])
expect_to_work(job_2)

@epoch += cursor_expiry # our cursor should now expire
@epoch += (cursor_expiry) # our cursor should now expire
# puts @epoch
expect_to_lock_with(cursor: 0)
expect_to_work(job_3)
end
Expand All @@ -139,8 +145,10 @@ def expect_to_lock_with(cursor:)
.order(:priority, :run_at, :job_id)
.limit(1).to_sql
end
let(:now) {Time.now}

before do
allow(Time).to receive(:now).and_return(Time.now + 1)
allow(Time).to receive(:now).and_return(now)
end

describe ".with_locked_job" do
Expand Down Expand Up @@ -173,7 +181,8 @@ def expect_to_work(job)
end
def query(cursor = 0)
QueJob.select(:job_id, :queue, :priority, :run_at, :job_class, :retryable, :args, :error_count)
.where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, Time.now)
.select("extract(epoch from (now() - run_at)) as latency")
.where("queue = ? AND job_id >= ? AND run_at <= ?", queue, cursor, now)
.where(retryable: true)
.order(:priority, :run_at, :job_id)
.limit(1).to_sql
Expand All @@ -191,15 +200,15 @@ def expect_to_lock_with(cursor:, locked_job: nil)
context "with no jobs to lock" do
it "scans entire table and calls block with nil job" do
expect(Que).to receive(:execute).with(query).and_call_original

with_locked_job do |job|
expect(job).to be_nil
end
end
end

context "with just one job to lock" do
let!(:job_1) { FakeJob.enqueue(1, queue: queue, priority: 1).attrs }
let!(:job_1) { FakeJob.enqueue(1, queue: queue, priority: 1, run_at: now - 1).attrs }
let(:cursor_expiry) { 60 }

# Pretend time isn't moving, as we don't want to test cursor expiry here
Expand All @@ -220,9 +229,9 @@ def expect_to_lock_with(cursor:, locked_job: nil)
end

context "with jobs to lock" do
let!(:job_1) { FakeJob.enqueue(1, queue: queue, priority: 1).attrs }
let!(:job_2) { FakeJob.enqueue(2, queue: queue, priority: 2).attrs }
let!(:job_3) { FakeJob.enqueue(3, queue: queue, priority: 3).attrs }
let!(:job_1) { FakeJob.enqueue(1, queue: queue, priority: 1, run_at: now - 1).attrs }
let!(:job_2) { FakeJob.enqueue(2, queue: queue, priority: 2, run_at: now - 1).attrs }
let!(:job_3) { FakeJob.enqueue(3, queue: queue, priority: 3, run_at: now - 1).attrs }

it "locks and then unlocks the most important job" do
expect_to_lock_with(cursor: 0, locked_job: job_1)
Expand Down

0 comments on commit d92a84a

Please sign in to comment.